You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ss...@apache.org on 2013/03/19 21:29:41 UTC
[1/2] git commit: GIRAPH-480 Add convergence detection to
org.apache.giraph.examples.RandomWalkVertex
GIRAPH-480 Add convergence detection to org.apache.giraph.examples.RandomWalkVertex
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fc2026fa
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fc2026fa
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fc2026fa
Branch: refs/heads/trunk
Commit: fc2026fa1ac5225a82e47c4968801d5140f6bfa4
Parents: 40e201d
Author: ssc <ss...@apache.org>
Authored: Sat Mar 9 11:36:43 2013 +0100
Committer: ssc <ss...@apache.org>
Committed: Tue Mar 19 08:58:43 2013 +0100
----------------------------------------------------------------------
giraph-examples/pom.xml | 1 +
.../LongDoubleNullDoubleTextInputFormat.java | 106 +++++++++++++++
.../org/apache/giraph/examples/PageRankVertex.java | 56 ++++++++
.../apache/giraph/examples/RandomWalkVertex.java | 98 ++++++++++----
.../examples/RandomWalkWithRestartVertex.java | 36 +++---
.../giraph/examples/RandomWalkWorkerContext.java | 26 +++--
...texWithDoubleValueNullEdgeTextOutputFormat.java | 59 ++++++++
.../apache/giraph/examples/PageRankVertexTest.java | 89 ++++++++++++
.../giraph/examples/RandomWalkTestUtils.java | 46 +++++++
.../examples/RandomWalkWithRestartVertexTest.java | 46 +++----
10 files changed, 480 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 7a18711..6adcdcd 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -53,6 +53,7 @@ under the License.
<headerLocation>license-header.txt</headerLocation>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <consoleOutput>true</consoleOutput>
</configuration>
<executions>
<execution>
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java
new file mode 100644
index 0000000..e22194a
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullDoubleTextInputFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Input format for unweighted graphs with long ids and double vertex values
+ */
+public class LongDoubleNullDoubleTextInputFormat
+ extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable,
+ DoubleWritable>
+ implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
+ NullWritable, DoubleWritable> {
+ /** Configuration. */
+ private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
+ NullWritable, DoubleWritable> conf;
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new LongDoubleNullDoubleVertexReader();
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
+ DoubleWritable, NullWritable, DoubleWritable> configuration) {
+ this.conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
+ NullWritable, DoubleWritable> getConf() {
+ return conf;
+ }
+
+ /**
+ * Vertex reader associated with
+ * {@link LongDoubleNullDoubleTextInputFormat}.
+ */
+ public class LongDoubleNullDoubleVertexReader extends
+ TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable,
+ DoubleWritable>.TextVertexReader {
+ /** Separator of the vertex and neighbors */
+ private final Pattern separator = Pattern.compile("[\t ]");
+
+ @Override
+ public Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
+ getCurrentVertex() throws IOException, InterruptedException {
+ Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
+ vertex = conf.createVertex();
+
+ String[] tokens =
+ separator.split(getRecordReader().getCurrentValue().toString());
+ List<Edge<LongWritable, NullWritable>> edges =
+ Lists.newArrayListWithCapacity(tokens.length - 1);
+ for (int n = 1; n < tokens.length; n++) {
+ edges.add(EdgeFactory.create(
+ new LongWritable(Long.parseLong(tokens[n])),
+ NullWritable.get()));
+ }
+
+ LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
+ vertex.initialize(vertexId, new DoubleWritable(), edges);
+
+ return vertex;
+ }
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
new file mode 100644
index 0000000..733ee53
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * The PageRank algorithm, with uniform transition probabilities on the edges
+ * http://en.wikipedia.org/wiki/PageRank
+ */
+public class PageRankVertex extends RandomWalkVertex<NullWritable> {
+
+ @Override
+ protected double transitionProbability(double stateProbability,
+ Edge<LongWritable, NullWritable> edge) {
+ return stateProbability / getNumEdges();
+ }
+
+ @Override
+ protected double recompute(Iterable<DoubleWritable> partialRanks,
+ double teleportationProbability) {
+
+ // rank contribution from incident neighbors
+ double rankFromNeighbors = MathUtils.sum(partialRanks);
+ // rank contribution from dangling vertices
+ double danglingContribution =
+ getDanglingProbability() / getTotalNumVertices();
+
+ // recompute rank
+ double rank = (1d - teleportationProbability) *
+ (rankFromNeighbors + danglingContribution) +
+ teleportationProbability / getTotalNumVertices();
+
+ return rank;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
index 8196523..85c6e27 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
@@ -24,34 +24,53 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
import java.io.IOException;
/**
- * Base class for executing a random walk on the graph
+ * Base class for executing a random walk on a graph
+ *
+ * @param <E> edge type
*/
-public abstract class RandomWalkVertex
- extends Vertex<LongWritable, DoubleWritable, DoubleWritable,
- DoubleWritable> {
+public abstract class RandomWalkVertex<E extends Writable>
+ extends Vertex<LongWritable, DoubleWritable, E, DoubleWritable> {
/** Configuration parameter for the number of supersteps to execute */
static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() +
".maxSupersteps";
/** Configuration parameter for the teleportation probability */
static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
.getName() + ".teleportationProbability";
- /** Name of aggregator for dangling nodes */
- static final String DANGLING = "dangling";
+ /** Name of aggregator for collecting the probability of dangling vertices */
+ static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class
+ .getName() + ".cumulativeDanglingProbability";
+ /** Name of aggregator for the L1 norm of the probability difference, used
+ * for covergence detection */
+ static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class
+ .getName() + ".l1NormOfProbabilityDifference";
/** Logger */
private static final Logger LOG = Logger.getLogger(RandomWalkVertex.class);
- /** State probability of the vertex */
- protected final DoubleWritable d = new DoubleWritable();
+ /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
+ private final DoubleWritable doubleWritable = new DoubleWritable();
/**
- * Compute an initial probability distribution for the vertex.
+ * Compute an initial probability value for the vertex. Per default,
+ * we start with a uniform distribution.
* @return The initial probability value.
*/
- protected abstract double initialProbability();
+ protected double initialProbability() {
+ return 1.0 / getTotalNumVertices();
+ }
+
+ /**
+ * Compute the probability of transitioning to a neighbor vertex
+ * @param stateProbability current steady state probability of the vertex
+ * @param edge edge to neighbor
+ * @return the probability of transitioning to a neighbor vertex
+ */
+ protected abstract double transitionProbability(double stateProbability,
+ Edge<LongWritable, E> edge);
/**
* Perform a single step of a random walk computation.
@@ -63,30 +82,43 @@ public abstract class RandomWalkVertex
protected abstract double recompute(Iterable<DoubleWritable> messages,
double teleportationProbability);
+ /**
+ * Returns the cumulative probability from dangling nodes.
+ * @return The cumulative probability from dangling nodes.
+ */
+ protected double getDanglingProbability() {
+ return this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+ }
+
@Override
public void compute(Iterable<DoubleWritable> messages) throws IOException {
double stateProbability;
if (getSuperstep() > 0) {
+ double previousStateProbability = getValue().get();
stateProbability = recompute(messages, teleportationProbability());
+
+ doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
+ aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
+
} else {
stateProbability = initialProbability();
}
- d.set(stateProbability);
- setValue(d);
+ doubleWritable.set(stateProbability);
+ setValue(doubleWritable);
// Compute dangling node contribution for next superstep
if (getNumEdges() == 0) {
- aggregate(DANGLING, d);
+ aggregate(CUMULATIVE_DANGLING_PROBABILITY, doubleWritable);
}
- // Execute the algorithm as often as configured,
- // alternatively convergence could be checked via an Aggregator
if (getSuperstep() < maxSupersteps()) {
- for (Edge<LongWritable, DoubleWritable> edge : getEdges()) {
- double transitionProbability = stateProbability * edge.getValue().get();
- sendMessage(edge.getTargetVertexId(), new DoubleWritable(
- transitionProbability));
+ for (Edge<LongWritable, E> edge : getEdges()) {
+ double transitionProbability =
+ transitionProbability(stateProbability, edge);
+ doubleWritable.set(transitionProbability);
+ sendMessage(edge.getTargetVertexId(), doubleWritable);
}
} else {
voteToHalt();
@@ -116,20 +148,38 @@ public abstract class RandomWalkVertex
*/
public static class RandomWalkVertexMasterCompute extends
DefaultMasterCompute {
+
+ /** threshold for the L1 norm of the state vector difference */
+ static final double CONVERGENCE_THRESHOLD = 0.00001;
+
@Override
public void compute() {
- // TODO This is a good place to implement halting by checking convergence.
double danglingContribution =
- this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
- .get();
+ this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+ double l1NormOfStateDiff =
+ this.<DoubleWritable>getAggregatedValue(
+ RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
+
LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
- danglingContribution);
+ danglingContribution + ", L1 Norm of state vector difference = " +
+ l1NormOfStateDiff);
+
+ // Convergence check: halt once the L1 norm of the difference between the
+ // state vectors fall under the threshold
+ if (getSuperstep() > 1 && l1NormOfStateDiff < CONVERGENCE_THRESHOLD) {
+ haltComputation();
+ }
+
}
@Override
public void initialize() throws InstantiationException,
IllegalAccessException {
- registerAggregator(RandomWalkVertex.DANGLING, DoubleSumAggregator.class);
+ registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
+ DoubleSumAggregator.class);
+ registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
+ DoubleSumAggregator.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
index 8a689ed..6f3eb6c 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
@@ -18,15 +18,19 @@
package org.apache.giraph.examples;
+import com.google.common.base.Preconditions;
+import org.apache.giraph.edge.Edge;
import org.apache.giraph.utils.MathUtils;
import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
/**
* Executes "RandomWalkWithRestart", a random walk on the graph which is biased
* towards a source vertex. The resulting probabilities of staying at a given
* vertex can be interpreted as a measure of proximity to the source vertex.
*/
-public class RandomWalkWithRestartVertex extends RandomWalkVertex {
+public class RandomWalkWithRestartVertex
+ extends RandomWalkVertex<DoubleWritable> {
/** Configuration parameter for the source vertex */
static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
@@ -42,34 +46,26 @@ public class RandomWalkWithRestartVertex extends RandomWalkVertex {
}
/**
- * Returns the number of source vertexes.
- * @return The number of source vertexes.
+ * Returns the number of source vertices.
+ * @return The number of source vertices.
*/
- private int numSourceVertexes() {
+ private int numSourceVertices() {
return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
}
- /**
- * Returns the cumulated probability from dangling nodes.
- * @return The cumulated probability from dangling nodes.
- */
- private double getDanglingProbability() {
- return this.<DoubleWritable>getAggregatedValue(RandomWalkVertex.DANGLING)
- .get();
- }
-
- /**
- * Start with a uniform distribution.
- * @return A uniform probability over all the vertexces.
- */
@Override
- protected double initialProbability() {
- return 1.0 / getTotalNumVertices();
+ protected double transitionProbability(double stateProbability,
+ Edge<LongWritable, DoubleWritable> edge) {
+ return stateProbability * edge.getValue().get();
}
@Override
protected double recompute(Iterable<DoubleWritable> transitionProbabilities,
double teleportationProbability) {
+
+ int numSourceVertices = numSourceVertices();
+ Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
+
double stateProbability = MathUtils.sum(transitionProbabilities);
// Add the contribution of dangling nodes (weakly preferential
// implementation: dangling nodes redistribute uniformly)
@@ -77,7 +73,7 @@ public class RandomWalkWithRestartVertex extends RandomWalkVertex {
// The random walk might teleport back to one of the source vertexes
stateProbability *= 1 - teleportationProbability;
if (isSourceVertex()) {
- stateProbability += teleportationProbability / numSourceVertexes();
+ stateProbability += teleportationProbability / numSourceVertices;
}
return stateProbability;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
index 5cff23f..2566f43 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
@@ -103,18 +103,25 @@ public class RandomWalkWorkerContext extends WorkerContext {
* Second option is a file with a list of vertex IDs, one per line. In this
* second case the preference vector is a uniform distribution over these
* vertexes.
- * @param configuration
- * The configuration.
+ * @param configuration The configuration.
+ * @return a (possibly empty) set of source vertices
*/
- private void initializeSources(Configuration configuration) {
+ private ImmutableSet<Long> initializeSources(Configuration configuration) {
ImmutableSet.Builder<Long> builder = ImmutableSet.builder();
long sourceVertex = configuration.getLong(SOURCE_VERTEX, Long.MIN_VALUE);
if (sourceVertex != Long.MIN_VALUE) {
- builder.add(sourceVertex);
+ return ImmutableSet.of(sourceVertex);
} else {
Path sourceFile = null;
try {
- sourceFile = DistributedCache.getLocalCacheFiles(configuration)[0];
+
+ Path[] cacheFiles = DistributedCache.getLocalCacheFiles(configuration);
+ if (cacheFiles == null || cacheFiles.length == 0) {
+ // empty set if no source vertices configured
+ return ImmutableSet.of();
+ }
+
+ sourceFile = cacheFiles[0];
FileSystem fs = FileSystem.getLocal(configuration);
BufferedReader in = new BufferedReader(new InputStreamReader(
fs.open(sourceFile)));
@@ -124,25 +131,24 @@ public class RandomWalkWorkerContext extends WorkerContext {
}
in.close();
} catch (IOException e) {
- e.printStackTrace();
getContext().setStatus(
"Could not load local cache files: " + sourceFile);
- LOG.error("Could not load local cache files: " + sourceFile);
+ LOG.error("Could not load local cache files: " + sourceFile, e);
}
}
- SOURCES = builder.build();
+ return builder.build();
}
@Override
public void preApplication() throws InstantiationException,
IllegalAccessException {
- Configuration configuration = this.getContext().getConfiguration();
+ Configuration configuration = getContext().getConfiguration();
MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS,
DEFAULT_MAX_SUPERSTEPS);
TELEPORTATION_PROBABILITY = configuration.getFloat(
RandomWalkVertex.TELEPORTATION_PROBABILITY,
DEFAULT_TELEPORTATION_PROBABILITY);
- initializeSources(configuration);
+ SOURCES = initializeSources(configuration);
}
@Override
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
new file mode 100644
index 0000000..85f3556
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * Output format for vertices with a long as id, a double as value and
+ * null edges
+ */
+public class VertexWithDoubleValueNullEdgeTextOutputFormat extends
+ TextVertexOutputFormat<LongWritable, DoubleWritable, NullWritable> {
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new VertexWithDoubleValueWriter();
+ }
+
+ /**
+ * Vertex writer used with
+ * {@link VertexWithDoubleValueNullEdgeTextOutputFormat}.
+ */
+ public class VertexWithDoubleValueWriter extends TextVertexWriter {
+ @Override
+ public void writeVertex(
+ Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex)
+ throws IOException, InterruptedException {
+ StringBuilder output = new StringBuilder();
+ output.append(vertex.getId().get());
+ output.append('\t');
+ output.append(vertex.getValue().get());
+ getRecordWriter().write(new Text(output.toString()), null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
new file mode 100644
index 0000000..9672d20
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/PageRankVertexTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Tests for {@link PageRankVertex}
+ */
+public class PageRankVertexTest {
+
+ /**
+ * A local integration test on toy data
+ */
+ @Test
+ public void testToyData() throws Exception {
+
+ // A small graph
+ String[] graph = new String[] {
+ "1 4 2 3",
+ "2 1",
+ "3",
+ "4 3 2",
+ "5 2 4"
+ };
+
+ Map<String, String> params = Maps.newHashMap();
+ params.put(RandomWalkWithRestartVertex.MAX_SUPERSTEPS, String.valueOf(50));
+ params.put(RandomWalkWithRestartVertex.TELEPORTATION_PROBABILITY,
+ String.valueOf(0.15));
+
+ GiraphClasses<LongWritable, DoubleWritable, NullWritable, DoubleWritable>
+ classes = new GiraphClasses<LongWritable, DoubleWritable,
+ NullWritable, DoubleWritable>();
+ classes.setVertexClass(PageRankVertex.class);
+ classes.setVertexEdgesClass(ByteArrayEdges.class);
+ classes.setVertexInputFormatClass(
+ LongDoubleNullDoubleTextInputFormat.class);
+ classes.setVertexOutputFormatClass(
+ VertexWithDoubleValueNullEdgeTextOutputFormat.class);
+ classes.setWorkerContextClass(RandomWalkWorkerContext.class);
+ classes.setMasterComputeClass(
+ RandomWalkVertex.RandomWalkVertexMasterCompute.class);
+ // Run internally
+ Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
+
+ Map<Long, Double> steadyStateProbabilities =
+ RandomWalkTestUtils.parseSteadyStateProbabilities(results);
+
+ assertEquals(0.28159076008518047, steadyStateProbabilities.get(1l),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.2514648601529863, steadyStateProbabilities.get(2l),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.22262961972286327, steadyStateProbabilities.get(3l),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.17646783276703806, steadyStateProbabilities.get(4l),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.06784692727193153, steadyStateProbabilities.get(5l),
+ RandomWalkTestUtils.EPSILON);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java
new file mode 100644
index 0000000..71528a3
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkTestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class RandomWalkTestUtils {
+
+ /** Minimum difference between doubles */
+ public static final double EPSILON = 10e-3;
+
+ /**
+ * Parse steady state probabilities.
+ * @param results The steady state probabilities in text format.
+ * @return A map representation of the steady state probabilities.
+ */
+ public static Map<Long, Double> parseSteadyStateProbabilities(
+ Iterable<String> results) {
+ Map<Long, Double> result = Maps.newHashMap();
+ for (String s : results) {
+ String[] tokens = s.split("\\t");
+ Long id = Long.parseLong(tokens[0]);
+ Double value = Double.parseDouble(tokens[1]);
+ result.put(id, value);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fc2026fa/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
index 489b35a..1ae9c52 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/RandomWalkWithRestartVertexTest.java
@@ -37,9 +37,6 @@ import static org.junit.Assert.assertEquals;
*/
public class RandomWalkWithRestartVertexTest {
- /** Minimum difference between doubles */
- private static final double EPSILON = 10e-3;
-
/**
* A local integration test on toy data
*/
@@ -69,13 +66,15 @@ public class RandomWalkWithRestartVertexTest {
Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
Map<Long, Double> steadyStateProbabilities =
- parseSteadyStateProbabilities(results);
+ RandomWalkTestUtils.parseSteadyStateProbabilities(results);
// values computed with external software
// 0.25, 0.354872, 0.09375, 0.301377
- assertEquals(0.25, steadyStateProbabilities.get(12L), EPSILON);
- assertEquals(0.354872, steadyStateProbabilities.get(34L), EPSILON);
- assertEquals(0.09375, steadyStateProbabilities.get(56L), EPSILON);
- assertEquals(0.301377, steadyStateProbabilities.get(78L), EPSILON);
+ assertEquals(0.25, steadyStateProbabilities.get(12L), RandomWalkTestUtils.EPSILON);
+ assertEquals(0.354872, steadyStateProbabilities.get(34L),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.09375, steadyStateProbabilities.get(56L), RandomWalkTestUtils.EPSILON);
+ assertEquals(0.301377, steadyStateProbabilities.get(78L),
+ RandomWalkTestUtils.EPSILON);
}
/**
@@ -108,29 +107,18 @@ public class RandomWalkWithRestartVertexTest {
Iterable<String> results = InternalVertexRunner.run(classes, params, graph);
Map<Long, Double> steadyStateProbabilities =
- parseSteadyStateProbabilities(results);
+ RandomWalkTestUtils.parseSteadyStateProbabilities(results);
// values computed with external software
// 0.163365, 0.378932, 0.156886, 0.300816
- assertEquals(0.163365, steadyStateProbabilities.get(12L), EPSILON);
- assertEquals(0.378932, steadyStateProbabilities.get(34L), EPSILON);
- assertEquals(0.156886, steadyStateProbabilities.get(56L), EPSILON);
- assertEquals(0.300816, steadyStateProbabilities.get(78L), EPSILON);
+ assertEquals(0.163365, steadyStateProbabilities.get(12L),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.378932, steadyStateProbabilities.get(34L),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.156886, steadyStateProbabilities.get(56L),
+ RandomWalkTestUtils.EPSILON);
+ assertEquals(0.300816, steadyStateProbabilities.get(78L),
+ RandomWalkTestUtils.EPSILON);
}
- /**
- * Parse steady state probabilities.
- * @param results The steady state probabilities in text format.
- * @return A map representation of the steady state probabilities.
- */
- private Map<Long, Double> parseSteadyStateProbabilities(
- Iterable<String> results) {
- Map<Long, Double> result = Maps.newHashMap();
- for (String s : results) {
- String[] tokens = s.split("\\t");
- Long id = Long.parseLong(tokens[0]);
- Double value = Double.parseDouble(tokens[1]);
- result.put(id, value);
- }
- return result;
- }
+
}