You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ss...@apache.org on 2013/03/19 21:29:41 UTC

[1/2] git commit: GIRAPH-480 Add convergence detection to org.apache.giraph.examples.RandomWalkVertex

GIRAPH-480 Add convergence detection to org.apache.giraph.examples.RandomWalkVertex


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fc2026fa
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fc2026fa
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fc2026fa

Branch: refs/heads/trunk
Commit: fc2026fa1ac5225a82e47c4968801d5140f6bfa4
Parents: 40e201d
Author: ssc <ss...@apache.org>
Authored: Sat Mar 9 11:36:43 2013 +0100
Committer: ssc <ss...@apache.org>
Committed: Tue Mar 19 08:58:43 2013 +0100

----------------------------------------------------------------------
 giraph-examples/pom.xml                            |    1 +
 .../LongDoubleNullDoubleTextInputFormat.java       |  106 +++++++++++++++
 .../org/apache/giraph/examples/PageRankVertex.java |   56 ++++++++
 .../apache/giraph/examples/RandomWalkVertex.java   |   98 ++++++++++----
 .../examples/RandomWalkWithRestartVertex.java      |   36 +++---
 .../giraph/examples/RandomWalkWorkerContext.java   |   26 +++--
 ...texWithDoubleValueNullEdgeTextOutputFormat.java |   59 ++++++++
 .../apache/giraph/examples/PageRankVertexTest.java |   89 ++++++++++++
 .../giraph/examples/RandomWalkTestUtils.java       |   46 +++++++
 .../examples/RandomWalkWithRestartVertexTest.java  |   46 +++----
 10 files changed, 480 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 7a18711..6adcdcd 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -53,6 +53,7 @@ under the License.
           <headerLocation>license-header.txt</headerLocation>
           <failOnViolation>true</failOnViolation>
           <includeTestSourceDirectory>false</includeTestSourceDirectory>
+          <consoleOutput>true</consoleOutput>
         </configuration>
         <executions>
           <execution>

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java
new file mode 100644
index 0000000..e22194a
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Input format for unweighted graphs with long ids and double vertex values
+ */
+public class LongDoubleNullDoubleTextInputFormat
+    extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable,
+    DoubleWritable>
+    implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
+    NullWritable, DoubleWritable> {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
+      NullWritable, DoubleWritable> conf;
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+                                             TaskAttemptContext context)
+    throws IOException {
+    return new LongDoubleNullDoubleVertexReader();
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
+      DoubleWritable, NullWritable, DoubleWritable> configuration) {
+    this.conf = configuration;
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
+      NullWritable, DoubleWritable> getConf() {
+    return conf;
+  }
+
+  /**
+   * Vertex reader associated with
+   * {@link LongDoubleNullDoubleTextInputFormat}.
+   */
+  public class LongDoubleNullDoubleVertexReader extends
+      TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable,
+          DoubleWritable>.TextVertexReader {
+    /** Separator of the vertex and neighbors */
+    private final Pattern separator = Pattern.compile("[\t ]");
+
+    @Override
+    public Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
+    getCurrentVertex() throws IOException, InterruptedException {
+      Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
+          vertex = conf.createVertex();
+
+      String[] tokens =
+          separator.split(getRecordReader().getCurrentValue().toString());
+      List<Edge<LongWritable, NullWritable>> edges =
+          Lists.newArrayListWithCapacity(tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.add(EdgeFactory.create(
+            new LongWritable(Long.parseLong(tokens[n])),
+            NullWritable.get()));
+      }
+
+      LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
+      vertex.initialize(vertexId, new DoubleWritable(), edges);
+
+      return vertex;
+    }
+
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
new file mode 100644
index 0000000..733ee53
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * The PageRank algorithm, with uniform transition probabilities on the edges
+ * http://en.wikipedia.org/wiki/PageRank
+ */
+public class PageRankVertex extends RandomWalkVertex<NullWritable> {
+
+  @Override
+  protected double transitionProbability(double stateProbability,
+      Edge<LongWritable, NullWritable> edge) {
+    return stateProbability / getNumEdges();
+  }
+
+  @Override
+  protected double recompute(Iterable<DoubleWritable> partialRanks,
+                             double teleportationProbability) {
+
+    // rank contribution from incident neighbors
+    double rankFromNeighbors = MathUtils.sum(partialRanks);
+    // rank contribution from dangling vertices
+    double danglingContribution =
+        getDanglingProbability() / getTotalNumVertices();
+
+    // recompute rank
+    double rank = (1d - teleportationProbability) *
+        (rankFromNeighbors + danglingContribution) +
+        teleportationProbability / getTotalNumVertices();
+
+    return rank;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
index 8196523..85c6e27 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
@@ -24,34 +24,53 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
 
 /**
- * Base class for executing a random walk on the graph
+ * Base class for executing a random walk on a graph
+ *
+ * @param <E> edge type
  */
-public abstract class RandomWalkVertex
-    extends Vertex<LongWritable, DoubleWritable, DoubleWritable,
-    DoubleWritable> {
+public abstract class RandomWalkVertex<E extends Writable>
+    extends Vertex<LongWritable, DoubleWritable, E, DoubleWritable> {
   /** Configuration parameter for the number of supersteps to execute */
   static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() +
       ".maxSupersteps";
   /** Configuration parameter for the teleportation probability */
   static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
       .getName() + ".teleportationProbability";
-  /** Name of aggregator for dangling nodes */
-  static final String DANGLING = "dangling";
+  /** Name of aggregator for collecting the probability of dangling vertices */
+  static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class
+      .getName() + ".cumulativeDanglingProbability";
+  /** Name of aggregator for the L1 norm of the probability difference, used
+   * for covergence detection */
+  static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class
+      .getName() + ".l1NormOfProbabilityDifference";
   /** Logger */
   private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class);
-  /** State probability of the vertex */
-  protected final DoubleWritable d = new DoubleWritable();
+  /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
+  private final DoubleWritable doubleWritable = new DoubleWritable();
 
   /**
-   * Compute an initial probability distribution for the vertex.
+   * Compute an initial probability value for the vertex. Per default,
+   * we start with a uniform distribution.
    * @return The initial probability value.
    */
-  protected abstract double initialProbability();
+  protected double initialProbability() {
+    return 1.0 / getTotalNumVertices();
+  }
+
+  /**
+   * Compute the probability of transitioning to a neighbor vertex
+   * @param stateProbability current steady state probability of the vertex
+   * @param edge edge to neighbor
+   * @return the probability of transitioning to a neighbor vertex
+   */
+  protected abstract double transitionProbability(double stateProbability,
+      Edge<LongWritable, E> edge);
 
   /**
    * Perform a single step of a random walk computation.
@@ -63,30 +82,43 @@ public abstract class RandomWalkVertex
   protected abstract double recompute(Iterable<DoubleWritable> messages,
       double teleportationProbability);
 
+  /**
+   * Returns the cumulative probability from dangling nodes.
+   * @return The cumulative probability from dangling nodes.
+   */
+  protected double getDanglingProbability() {
+    return this.<DoubleWritable>getAggregatedValue(
+        RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+  }
+
   @Override
   public void compute(Iterable<DoubleWritable> messages) throws IOException {
     double stateProbability;
 
     if (getSuperstep() > 0) {
+      double previousStateProbability = getValue().get();
       stateProbability = recompute(messages, teleportationProbability());
+
+      doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
+      aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
+
     } else {
       stateProbability = initialProbability();
     }
-    d.set(stateProbability);
-    setValue(d);
+    doubleWritable.set(stateProbability);
+    setValue(doubleWritable);
 
     // Compute dangling node contribution for next superstep
     if (getNumEdges() == 0) {
-      aggregate(DANGLING, d);
+      aggregate(CUMULATIVE_DANGLING_PROBABILITY, doubleWritable);
     }
 
-    // Execute the algorithm as often as configured,
-    // alternatively convergence could be checked via an Aggregator
     if (getSuperstep() < maxSupersteps()) {
-      for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
-        double transitionProbability = stateProbability * edge.getValue().get();
-        sendMessage(edge.getTargetVertexId(), new DoubleWritable(
-            transitionProbability));
+      for (Edge<LongWritable, E> edge : getEdges()) {
+        double transitionProbability =
+            transitionProbability(stateProbability, edge);
+        doubleWritable.set(transitionProbability);
+        sendMessage(edge.getTargetVertexId(), doubleWritable);
       }
     } else {
       voteToHalt();
@@ -116,20 +148,38 @@ public abstract class RandomWalkVertex
    */
   public static class RandomWalkVertexMasterCompute extends
       DefaultMasterCompute {
+
+    /** threshold for the L1 norm of the state vector difference  */
+    static final double CONVERGENCE_THRESHOLD = 0.00001;
+
     @Override
     public void compute() {
-      // TODO This is a good place to implement halting by checking convergence.
       double danglingContribution =
-          this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
-              .get();
+          this.<DoubleWritable>getAggregatedValue(
+          RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+      double l1NormOfStateDiff =
+          this.<DoubleWritable>getAggregatedValue(
+          RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
+
       LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
-          danglingContribution);
+          danglingContribution + ", L1 Norm of state vector difference = " +
+          l1NormOfStateDiff);
+
+      // Convergence check: halt once the L1 norm of the difference between the
+      // state vectors fall under the threshold
+      if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
+        haltComputation();
+      }
+
     }
 
     @Override
     public void initialize() throws InstantiationException,
         IllegalAccessException {
-      registerAggregator(RandomWalkVertex.DANGLING, DoubleSumAggregator.class);
+      registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
+          DoubleSumAggregator.class);
+      registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
+          DoubleSumAggregator.class);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
index 8a689ed..6f3eb6c 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
@@ -18,15 +18,19 @@
 
 package org.apache.giraph.examples;
 
+import com.google.common.base.Preconditions;
+import org.apache.giraph.edge.Edge;
 import org.apache.giraph.utils.MathUtils;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
 
 /**
  * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
  * towards a source vertex. The resulting probabilities of staying at a given
  * vertex can be interpreted as a measure of proximity to the source vertex.
  */
-public class RandomWalkWithRestartVertex extends RandomWalkVertex {
+public class RandomWalkWithRestartVertex
+    extends RandomWalkVertex<DoubleWritable> {
 
   /** Configuration parameter for the source vertex */
   static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
@@ -42,34 +46,26 @@ public class RandomWalkWithRestartVertex extends RandomWalkVertex {
   }
 
   /**
-   * Returns the number of source vertexes.
-   * @return The number of source vertexes.
+   * Returns the number of source vertices.
+   * @return The number of source vertices.
    */
-  private int numSourceVertexes() {
+  private int numSourceVertices() {
     return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
   }
 
-  /**
-   * Returns the cumulated probability from dangling nodes.
-   * @return The cumulated probability from dangling nodes.
-   */
-  private double getDanglingProbability() {
-    return this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
-        .get();
-  }
-
-  /**
-   * Start with a uniform distribution.
-   * @return A uniform probability over all the vertexces.
-   */
   @Override
-  protected double initialProbability() {
-    return 1.0 / getTotalNumVertices();
+  protected double transitionProbability(double stateProbability,
+      Edge<LongWritable, DoubleWritable> edge) {
+    return stateProbability * edge.getValue().get();
   }
 
   @Override
   protected double recompute(Iterable<DoubleWritable> transitionProbabilities,
       double teleportationProbability) {
+
+    int numSourceVertices = numSourceVertices();
+    Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
+
     double stateProbability = MathUtils.sum(transitionProbabilities);
     // Add the contribution of dangling nodes (weakly preferential
     // implementation: dangling nodes redistribute uniformly)
@@ -77,7 +73,7 @@ public class RandomWalkWithRestartVertex extends RandomWalkVertex {
     // The random walk might teleport back to one of the source vertexes
     stateProbability *= 1 - teleportationProbability;
     if (isSourceVertex()) {
-      stateProbability += teleportationProbability / numSourceVertexes();
+      stateProbability += teleportationProbability / numSourceVertices;
     }
     return stateProbability;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
index 5cff23f..2566f43 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
@@ -103,18 +103,25 @@ public class RandomWalkWorkerContext extends WorkerContext {
    * Second option is a file with a list of vertex IDs, one per line. In this
    * second case the preference vector is a uniform distribution over these
    * vertexes.
-   * @param configuration
-   *          The configuration.
+   * @param configuration The configuration.
+   * @return a (possibly empty) set of source vertices
    */
-  private void initializeSources(Configuration configuration) {
+  private ImmutableSet<Long> initializeSources(Configuration configuration) {
     ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
     long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
     if (sourceVertex != Long.MIN_VALUE) {
-      builder.add(sourceVertex);
+      return ImmutableSet.of(sourceVertex);
     } else {
       Path sourceFile = null;
       try {
-        sourceFile = DistributedCache.getLocalCacheFiles(configuration)[0];
+
+        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(configuration);
+        if (cacheFiles == null || cacheFiles.length == 0) {
+          // empty set if no source vertices configured
+          return ImmutableSet.of();
+        }
+
+        sourceFile = cacheFiles[0];
         FileSystem fs = FileSystem.getLocal(configuration);
         BufferedReader in = new BufferedReader(new InputStreamReader(
             fs.open(sourceFile)));
@@ -124,25 +131,24 @@ public class RandomWalkWorkerContext extends WorkerContext {
         }
         in.close();
       } catch (IOException e) {
-        e.printStackTrace();
         getContext().setStatus(
             "Could not load local cache files: " + sourceFile);
-        LOG.error("Could not load local cache files: " + sourceFile);
+        LOG.error("Could not load local cache files: " + sourceFile, e);
       }
     }
-    SOURCES = builder.build();
+    return builder.build();
   }
 
   @Override
   public void preApplication() throws InstantiationException,
       IllegalAccessException {
-    Configuration configuration = this.getContext().getConfiguration();
+    Configuration configuration = getContext().getConfiguration();
     MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS,
         DEFAULT_MAX_SUPERSTEPS);
     TELEPORTATION_PROBABILITY = configuration.getFloat(
         RandomWalkVertex.TELEPORTATION_PROBABILITY,
         DEFAULT_TELEPORTATION_PROBABILITY);
-    initializeSources(configuration);
+    SOURCES = initializeSources(configuration);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
new file mode 100644
index 0000000..85f3556
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Output format for vertices with a long as id, a double as value and
+ * null edges
+ */
+public class VertexWithDoubleValueNullEdgeTextOutputFormat extends
+    TextVertexOutputFormat<LongWritable, DoubleWritable, NullWritable> {
+  @Override
+  public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+    throws IOException, InterruptedException {
+    return new VertexWithDoubleValueWriter();
+  }
+
+  /**
+   * Vertex writer used with
+   * {@link VertexWithDoubleValueNullEdgeTextOutputFormat}.
+   */
+  public class VertexWithDoubleValueWriter extends TextVertexWriter {
+    @Override
+    public void writeVertex(
+        Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex)
+      throws IOException, InterruptedException {
+      StringBuilder output = new StringBuilder();
+      output.append(vertex.getId().get());
+      output.append('\t');
+      output.append(vertex.getValue().get());
+      getRecordWriter().write(new Text(output.toString()), null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
new file mode 100644
index 0000000..9672d20
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Tests for {@link PageRankVertex}
+ */
+public class PageRankVertexTest {
+
+  /**
+   * A local integration test on toy data
+   */
+  @Test
+  public void testToyData() throws Exception {
+
+    // A small graph
+    String[] graph = new String[] {
+      "1 4 2 3",
+      "2 1",
+      "3",
+      "4 3 2",
+      "5 2 4"
+    };
+
+    Map<String, String> params = Maps.newHashMap();
+    params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, String.valueOf(50));
+    params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY,
+        String.valueOf(0.15));
+
+    GiraphClasses<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
+        classes = new GiraphClasses<LongWritable, DoubleWritable,
+                NullWritable, DoubleWritable>();
+    classes.setVertexClass(PageRankVertex.class);
+    classes.setVertexEdgesClass(ByteArrayEdges.class);
+    classes.setVertexInputFormatClass(
+        LongDoubleNullDoubleTextInputFormat.class);
+    classes.setVertexOutputFormatClass(
+        VertexWithDoubleValueNullEdgeTextOutputFormat.class);
+    classes.setWorkerContextClass(RandomWalkWorkerContext.class);
+    classes.setMasterComputeClass(
+        RandomWalkVertex.RandomWalkVertexMasterCompute.class);
+    // Run internally
+    Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+    Map<Long, Double> steadyStateProbabilities =
+        RandomWalkTestUtils.parseSteadyStateProbabilities(results);
+
+    assertEquals(0.28159076008518047, steadyStateProbabilities.get(1l),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.2514648601529863, steadyStateProbabilities.get(2l),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.22262961972286327, steadyStateProbabilities.get(3l),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.17646783276703806, steadyStateProbabilities.get(4l),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.06784692727193153, steadyStateProbabilities.get(5l),
+        RandomWalkTestUtils.EPSILON);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java
new file mode 100644
index 0000000..71528a3
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class RandomWalkTestUtils {
+
+  /** Minimum difference between doubles */
+  public static final double EPSILON = 10e-3;
+
+  /**
+   * Parse steady state probabilities.
+   * @param results The steady state probabilities in text format.
+   * @return A map representation of the steady state probabilities.
+   */
+  public static Map<Long, Double> parseSteadyStateProbabilities(
+      Iterable<String> results) {
+    Map<Long, Double> result = Maps.newHashMap();
+    for (String s : results) {
+      String[] tokens = s.split("\\t");
+      Long id = Long.parseLong(tokens[0]);
+      Double value = Double.parseDouble(tokens[1]);
+      result.put(id, value);
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
index 489b35a..1ae9c52 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -37,9 +37,6 @@ import static org.junit.Assert.assertEquals;
  */
 public class RandomWalkWithRestartVertexTest {
 
-  /** Minimum difference between doubles */
-  private static final double EPSILON = 10e-3;
-
   /**
    * A local integration test on toy data
    */
@@ -69,13 +66,15 @@ public class RandomWalkWithRestartVertexTest {
     Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
 
     Map<Long, Double> steadyStateProbabilities =
-        parseSteadyStateProbabilities(results);
+        RandomWalkTestUtils.parseSteadyStateProbabilities(results);
     // values computed with external software
     // 0.25, 0.354872, 0.09375, 0.301377
-    assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON);
-    assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON);
-    assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON);
-    assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON);
+    assertEquals(0.25, steadyStateProbabilities.get(12L), RandomWalkTestUtils.EPSILON);
+    assertEquals(0.354872, steadyStateProbabilities.get(34L),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.09375, steadyStateProbabilities.get(56L), RandomWalkTestUtils.EPSILON);
+    assertEquals(0.301377, steadyStateProbabilities.get(78L),
+        RandomWalkTestUtils.EPSILON);
   }
 
   /**
@@ -108,29 +107,18 @@ public class RandomWalkWithRestartVertexTest {
     Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
 
     Map<Long, Double> steadyStateProbabilities =
-        parseSteadyStateProbabilities(results);
+        RandomWalkTestUtils.parseSteadyStateProbabilities(results);
     // values computed with external software
     // 0.163365, 0.378932, 0.156886, 0.300816
-    assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON);
-    assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON);
-    assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON);
-    assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON);
+    assertEquals(0.163365, steadyStateProbabilities.get(12L),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.378932, steadyStateProbabilities.get(34L),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.156886, steadyStateProbabilities.get(56L),
+        RandomWalkTestUtils.EPSILON);
+    assertEquals(0.300816, steadyStateProbabilities.get(78L),
+        RandomWalkTestUtils.EPSILON);
   }
 
-  /**
-   * Parse steady state probabilities.
-   * @param results The steady state probabilities in text format.
-   * @return A map representation of the steady state probabilities.
-   */
-  private Map<Long, Double> parseSteadyStateProbabilities(
-      Iterable<String> results) {
-    Map<Long, Double> result = Maps.newHashMap();
-    for (String s : results) {
-      String[] tokens = s.split("\\t");
-      Long id = Long.parseLong(tokens[0]);
-      Double value = Double.parseDouble(tokens[1]);
-      result.put(id, value);
-    }
-    return result;
-  }
+
 }