You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:00 UTC
[04/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java
new file mode 100644
index 0000000..984e079
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpoint.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * An example that simply uses its id, value, and edges to compute new data
+ * every iteration to verify that checkpoint restarting works. Fault injection
+ * can also test automated checkpoint restarts.
+ */
+public class SimpleCheckpoint implements Tool {
+ /** Which superstep to cause the worker to fail */
+ public static final int FAULTING_SUPERSTEP = 4;
+ /** Vertex id to fault on */
+ public static final long FAULTING_VERTEX_ID = 1;
+ /** Dynamically set number of supersteps */
+ public static final String SUPERSTEP_COUNT =
+ "simpleCheckpointVertex.superstepCount";
+ /** Should fault? */
+ public static final String ENABLE_FAULT =
+ "simpleCheckpointVertex.enableFault";
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleCheckpoint.class);
+ /** Configuration */
+ private Configuration conf;
+
+ /**
+ * Actual computation.
+ */
+ public static class SimpleCheckpointComputation extends
+ BasicComputation<LongWritable, IntWritable, FloatWritable,
+ FloatWritable> {
+ @Override
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<FloatWritable> messages) throws IOException {
+ SimpleCheckpointVertexWorkerContext workerContext = getWorkerContext();
+
+ boolean enableFault = workerContext.getEnableFault();
+ int supersteps = workerContext.getSupersteps();
+
+ if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
+ (getContext().getTaskAttemptID().getId() == 0) &&
+ (vertex.getId().get() == FAULTING_VERTEX_ID)) {
+ LOG.info("compute: Forced a fault on the first " +
+ "attempt of superstep " +
+ FAULTING_SUPERSTEP + " and vertex id " +
+ FAULTING_VERTEX_ID);
+ System.exit(-1);
+ }
+ if (getSuperstep() > supersteps) {
+ vertex.voteToHalt();
+ return;
+ }
+ long sumAgg = this.<LongWritable>getAggregatedValue(
+ LongSumAggregator.class.getName()).get();
+ LOG.info("compute: " + sumAgg);
+ aggregate(LongSumAggregator.class.getName(),
+ new LongWritable(vertex.getId().get()));
+ LOG.info("compute: sum = " + sumAgg +
+ " for vertex " + vertex.getId());
+ float msgValue = 0.0f;
+ for (FloatWritable message : messages) {
+ float curMsgValue = message.get();
+ msgValue += curMsgValue;
+ LOG.info("compute: got msgValue = " + curMsgValue +
+ " for vertex " + vertex.getId() +
+ " on superstep " + getSuperstep());
+ }
+ int vertexValue = vertex.getValue().get();
+ vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
+ LOG.info("compute: vertex " + vertex.getId() +
+ " has value " + vertex.getValue() +
+ " on superstep " + getSuperstep());
+ for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+ FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
+ (float) vertexValue);
+ Edge<LongWritable, FloatWritable> newEdge =
+ EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
+ LOG.info("compute: vertex " + vertex.getId() +
+ " sending edgeValue " + edge.getValue() +
+ " vertexValue " + vertexValue +
+ " total " + newEdgeValue +
+ " to vertex " + edge.getTargetVertexId() +
+ " on superstep " + getSuperstep());
+ vertex.addEdge(newEdge);
+ sendMessage(edge.getTargetVertexId(), newEdgeValue);
+ }
+ }
+ }
+
+ /**
+ * Worker context associated with {@link SimpleCheckpoint}.
+ */
+ public static class SimpleCheckpointVertexWorkerContext
+ extends WorkerContext {
+ /** Filename to indicate whether a fault was found */
+ public static final String FAULT_FILE = "/tmp/faultFile";
+ /** User can access this after the application finishes if local */
+ private static long FINAL_SUM;
+ /** Number of supersteps to run (6 by default) */
+ private int supersteps = 6;
+ /** Enable the fault at the particular vertex id and superstep? */
+ private boolean enableFault = false;
+
+ public static long getFinalSum() {
+ return FINAL_SUM;
+ }
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ supersteps = getContext().getConfiguration()
+ .getInt(SUPERSTEP_COUNT, supersteps);
+ enableFault = getContext().getConfiguration()
+ .getBoolean(ENABLE_FAULT, false);
+ }
+
+ @Override
+ public void postApplication() {
+ FINAL_SUM = this.<LongWritable>getAggregatedValue(
+ LongSumAggregator.class.getName()).get();
+ LOG.info("FINAL_SUM=" + FINAL_SUM);
+ }
+
+ @Override
+ public void preSuperstep() {
+ }
+
+ @Override
+ public void postSuperstep() { }
+
+ public int getSupersteps() {
+ return this.supersteps;
+ }
+
+ public boolean getEnableFault() {
+ return this.enableFault;
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("w",
+ "workers",
+ true,
+ "Number of workers");
+ options.addOption("s",
+ "supersteps",
+ true,
+ "Supersteps to execute before finishing");
+ options.addOption("w",
+ "workers",
+ true,
+ "Minimum number of workers");
+ options.addOption("o",
+ "outputDirectory",
+ true,
+ "Output directory");
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.hasOption('h')) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ if (!cmd.hasOption('w')) {
+ LOG.info("Need to choose the number of workers (-w)");
+ return -1;
+ }
+ if (!cmd.hasOption('o')) {
+ LOG.info("Need to set the output directory (-o)");
+ return -1;
+ }
+
+ GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
+ bspJob.getConfiguration().setComputationClass(
+ SimpleCheckpointComputation.class);
+ bspJob.getConfiguration().setVertexInputFormatClass(
+ GeneratedVertexInputFormat.class);
+ bspJob.getConfiguration().setVertexOutputFormatClass(
+ IdWithValueTextOutputFormat.class);
+ bspJob.getConfiguration().setWorkerContextClass(
+ SimpleCheckpointVertexWorkerContext.class);
+ bspJob.getConfiguration().setMasterComputeClass(
+ SimpleCheckpointVertexMasterCompute.class);
+ int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+ int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+ bspJob.getConfiguration().setWorkerConfiguration(
+ minWorkers, maxWorkers, 100.0f);
+
+ FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
+ new Path(cmd.getOptionValue('o')));
+ boolean verbose = false;
+ if (cmd.hasOption('v')) {
+ verbose = true;
+ }
+ if (cmd.hasOption('s')) {
+ getConf().setInt(SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+ }
+ if (bspJob.run(verbose)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Master compute associated with {@link SimpleCheckpoint}.
+ * It registers required aggregators.
+ */
+ public static class SimpleCheckpointVertexMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ }
+ }
+
+ /**
+ * Executable from the command line.
+ *
+ * @param args Command line args.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new SimpleCheckpoint(), args));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
deleted file mode 100644
index 03d977b..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.log4j.Logger;
-
-/**
- * An example that simply uses its id, value, and edges to compute new data
- * every iteration to verify that checkpoint restarting works. Fault injection
- * can also test automated checkpoint restarts.
- */
-public class SimpleCheckpointVertex implements Tool {
- /** Which superstep to cause the worker to fail */
- public static final int FAULTING_SUPERSTEP = 4;
- /** Vertex id to fault on */
- public static final long FAULTING_VERTEX_ID = 1;
- /** Dynamically set number of supersteps */
- public static final String SUPERSTEP_COUNT =
- "simpleCheckpointVertex.superstepCount";
- /** Should fault? */
- public static final String ENABLE_FAULT =
- "simpleCheckpointVertex.enableFault";
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleCheckpointVertex.class);
- /** Configuration */
- private Configuration conf;
-
- /**
- * Actual computation.
- */
- public static class SimpleCheckpointComputation extends
- Vertex<LongWritable, IntWritable, FloatWritable, FloatWritable> {
- @Override
- public void compute(Iterable<FloatWritable> messages) {
- SimpleCheckpointVertexWorkerContext workerContext =
- (SimpleCheckpointVertexWorkerContext) getWorkerContext();
-
- boolean enableFault = workerContext.getEnableFault();
- int supersteps = workerContext.getSupersteps();
-
- if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
- (getContext().getTaskAttemptID().getId() == 0) &&
- (getId().get() == FAULTING_VERTEX_ID)) {
- LOG.info("compute: Forced a fault on the first " +
- "attempt of superstep " +
- FAULTING_SUPERSTEP + " and vertex id " +
- FAULTING_VERTEX_ID);
- System.exit(-1);
- }
- if (getSuperstep() > supersteps) {
- voteToHalt();
- return;
- }
- long sumAgg = this.<LongWritable>getAggregatedValue(
- LongSumAggregator.class.getName()).get();
- LOG.info("compute: " + sumAgg);
- aggregate(LongSumAggregator.class.getName(),
- new LongWritable(getId().get()));
- LOG.info("compute: sum = " + sumAgg +
- " for vertex " + getId());
- float msgValue = 0.0f;
- for (FloatWritable message : messages) {
- float curMsgValue = message.get();
- msgValue += curMsgValue;
- LOG.info("compute: got msgValue = " + curMsgValue +
- " for vertex " + getId() +
- " on superstep " + getSuperstep());
- }
- int vertexValue = getValue().get();
- setValue(new IntWritable(vertexValue + (int) msgValue));
- LOG.info("compute: vertex " + getId() +
- " has value " + getValue() +
- " on superstep " + getSuperstep());
- for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
- FloatWritable newEdgeValue = new FloatWritable(edge.getValue().get() +
- (float) vertexValue);
- Edge<LongWritable, FloatWritable> newEdge =
- EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
- LOG.info("compute: vertex " + getId() +
- " sending edgeValue " + edge.getValue() +
- " vertexValue " + vertexValue +
- " total " + newEdgeValue +
- " to vertex " + edge.getTargetVertexId() +
- " on superstep " + getSuperstep());
- addEdge(newEdge);
- sendMessage(edge.getTargetVertexId(), newEdgeValue);
- }
- }
- }
-
- /**
- * Worker context associated with {@link SimpleCheckpointVertex}.
- */
- public static class SimpleCheckpointVertexWorkerContext
- extends WorkerContext {
- /** Filename to indicate whether a fault was found */
- public static final String FAULT_FILE = "/tmp/faultFile";
- /** User can access this after the application finishes if local */
- private static long FINAL_SUM;
- /** Number of supersteps to run (6 by default) */
- private int supersteps = 6;
- /** Enable the fault at the particular vertex id and superstep? */
- private boolean enableFault = false;
-
- public static long getFinalSum() {
- return FINAL_SUM;
- }
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- supersteps = getContext().getConfiguration()
- .getInt(SUPERSTEP_COUNT, supersteps);
- enableFault = getContext().getConfiguration()
- .getBoolean(ENABLE_FAULT, false);
- }
-
- @Override
- public void postApplication() {
- FINAL_SUM = this.<LongWritable>getAggregatedValue(
- LongSumAggregator.class.getName()).get();
- LOG.info("FINAL_SUM=" + FINAL_SUM);
- }
-
- @Override
- public void preSuperstep() {
- }
-
- @Override
- public void postSuperstep() { }
-
- public int getSupersteps() {
- return this.supersteps;
- }
-
- public boolean getEnableFault() {
- return this.enableFault;
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- options.addOption("h", "help", false, "Help");
- options.addOption("v", "verbose", false, "Verbose");
- options.addOption("w",
- "workers",
- true,
- "Number of workers");
- options.addOption("s",
- "supersteps",
- true,
- "Supersteps to execute before finishing");
- options.addOption("w",
- "workers",
- true,
- "Minimum number of workers");
- options.addOption("o",
- "outputDirectory",
- true,
- "Output directory");
- HelpFormatter formatter = new HelpFormatter();
- if (args.length == 0) {
- formatter.printHelp(getClass().getName(), options, true);
- return 0;
- }
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(options, args);
- if (cmd.hasOption('h')) {
- formatter.printHelp(getClass().getName(), options, true);
- return 0;
- }
- if (!cmd.hasOption('w')) {
- LOG.info("Need to choose the number of workers (-w)");
- return -1;
- }
- if (!cmd.hasOption('o')) {
- LOG.info("Need to set the output directory (-o)");
- return -1;
- }
-
- GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
- bspJob.getConfiguration().setVertexClass(SimpleCheckpointComputation.class);
- bspJob.getConfiguration().setVertexInputFormatClass(
- GeneratedVertexInputFormat.class);
- bspJob.getConfiguration().setVertexOutputFormatClass(
- IdWithValueTextOutputFormat.class);
- bspJob.getConfiguration().setWorkerContextClass(
- SimpleCheckpointVertexWorkerContext.class);
- bspJob.getConfiguration().setMasterComputeClass(
- SimpleCheckpointVertexMasterCompute.class);
- int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
- int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
- bspJob.getConfiguration().setWorkerConfiguration(
- minWorkers, maxWorkers, 100.0f);
-
- FileOutputFormat.setOutputPath(bspJob.getInternalJob(),
- new Path(cmd.getOptionValue('o')));
- boolean verbose = false;
- if (cmd.hasOption('v')) {
- verbose = true;
- }
- if (cmd.hasOption('s')) {
- getConf().setInt(SUPERSTEP_COUNT,
- Integer.parseInt(cmd.getOptionValue('s')));
- }
- if (bspJob.run(verbose)) {
- return 0;
- } else {
- return -1;
- }
- }
-
- /**
- * Master compute associated with {@link SimpleCheckpointVertex}.
- * It registers required aggregators.
- */
- public static class SimpleCheckpointVertexMasterCompute extends
- DefaultMasterCompute {
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- }
- }
-
- /**
- * Executable from the command line.
- *
- * @param args Command line args.
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java
new file mode 100644
index 0000000..551a2a2
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerComputation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Test whether messages can go through a combiner.
+ */
+public class SimpleCombinerComputation extends
+ BasicComputation<LongWritable, IntWritable, FloatWritable, IntWritable> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(SimpleCombinerComputation.class);
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ if (vertex.getId().equals(new LongWritable(2))) {
+ sendMessage(new LongWritable(1), new IntWritable(101));
+ sendMessage(new LongWritable(1), new IntWritable(102));
+ sendMessage(new LongWritable(1), new IntWritable(103));
+ }
+ if (!vertex.getId().equals(new LongWritable(1))) {
+ vertex.voteToHalt();
+ } else {
+ // Check the messages
+ int sum = 0;
+ int num = 0;
+ for (IntWritable message : messages) {
+ sum += message.get();
+ num++;
+ }
+ LOG.info("TestCombinerVertex: Received a sum of " + sum +
+ " (should have 306 with a single message value)");
+
+ if (num == 1 && sum == 306) {
+ vertex.voteToHalt();
+ }
+ }
+ if (getSuperstep() > 3) {
+ throw new IllegalStateException(
+ "TestCombinerVertex: Vertex 1 failed to receive " +
+ "messages in time");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
deleted file mode 100644
index f4475ae..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Test whether messages can go through a combiner.
- */
-public class SimpleCombinerVertex extends
- Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class);
-
- @Override
- public void compute(Iterable<IntWritable> messages) {
- if (getId().equals(new LongWritable(2))) {
- sendMessage(new LongWritable(1), new IntWritable(101));
- sendMessage(new LongWritable(1), new IntWritable(102));
- sendMessage(new LongWritable(1), new IntWritable(103));
- }
- if (!getId().equals(new LongWritable(1))) {
- voteToHalt();
- } else {
- // Check the messages
- int sum = 0;
- int num = 0;
- for (IntWritable message : messages) {
- sum += message.get();
- num++;
- }
- LOG.info("TestCombinerVertex: Received a sum of " + sum +
- " (should have 306 with a single message value)");
-
- if (num == 1 && sum == 306) {
- voteToHalt();
- }
- }
- if (getSuperstep() > 3) {
- throw new IllegalStateException(
- "TestCombinerVertex: Vertex 1 failed to receive " +
- "messages in time");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java
new file mode 100644
index 0000000..1582902
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailComputation.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Vertex to allow unit testing of failure detection
+ */
+public class SimpleFailComputation extends BasicComputation<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(SimpleFailComputation.class);
+ /** TODO: Change this behavior to WorkerContext */
+ private static long SUPERSTEP = 0;
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ for (DoubleWritable message : messages) {
+ sum += message.get();
+ }
+ DoubleWritable vertexValue =
+ new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+ vertex.setValue(vertexValue);
+ if (getSuperstep() < 30) {
+ if (getSuperstep() == 20) {
+ if (vertex.getId().get() == 10L) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ LOG.info("Sleep interrupted ", e);
+ }
+ System.exit(1);
+ } else if (getSuperstep() - SUPERSTEP > 10) {
+ return;
+ }
+ }
+ long edges = vertex.getNumEdges();
+ sendMessageToAllEdges(vertex,
+ new DoubleWritable(vertex.getValue().get() / edges));
+ } else {
+ vertex.voteToHalt();
+ }
+ SUPERSTEP = getSuperstep();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
deleted file mode 100644
index c0e206c..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleFailVertex.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Vertex to allow unit testing of failure detection
- */
-public class SimpleFailVertex extends Vertex<
- LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(SimpleFailVertex.class);
- /** TODO: Change this behavior to WorkerContext */
- private static long SUPERSTEP = 0;
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- if (getSuperstep() >= 1) {
- double sum = 0;
- for (DoubleWritable message : messages) {
- sum += message.get();
- }
- DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
- setValue(vertexValue);
- if (getSuperstep() < 30) {
- if (getSuperstep() == 20) {
- if (getId().get() == 10L) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted ", e);
- }
- System.exit(1);
- } else if (getSuperstep() - SUPERSTEP > 10) {
- return;
- }
- }
- long edges = getNumEdges();
- sendMessageToAllEdges(
- new DoubleWritable(getValue().get() / edges));
- } else {
- voteToHalt();
- }
- SUPERSTEP = getSuperstep();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java
new file mode 100644
index 0000000..1513401
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountComputation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Simple function to return the out degree for each vertex.
+ */
+@Algorithm(
+ name = "Indegree Count"
+)
+public class SimpleInDegreeCountComputation extends BasicComputation<
+ LongWritable, LongWritable, DoubleWritable, DoubleWritable> {
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, LongWritable, DoubleWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() == 0) {
+ Iterable<Edge<LongWritable, DoubleWritable>> edges = vertex.getEdges();
+ for (Edge<LongWritable, DoubleWritable> edge : edges) {
+ sendMessage(edge.getTargetVertexId(), new DoubleWritable(1.0));
+ }
+ } else {
+ long sum = 0;
+ for (DoubleWritable message : messages) {
+ sum++;
+ }
+ LongWritable vertexValue = vertex.getValue();
+ vertexValue.set(sum);
+ vertex.setValue(vertexValue);
+ vertex.voteToHalt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java
deleted file mode 100644
index 3bcf3f5..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleInDegreeCountVertex.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Simple function to return the out degree for each vertex.
- */
-@Algorithm(
- name = "Indegree Count"
-)
-public class SimpleInDegreeCountVertex extends Vertex<
- LongWritable, LongWritable, DoubleWritable, DoubleWritable> {
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- if (getSuperstep() == 0) {
- Iterable<Edge<LongWritable, DoubleWritable>> edges = getEdges();
- for (Edge<LongWritable, DoubleWritable> edge : edges) {
- sendMessage(edge.getTargetVertexId(), new DoubleWritable(1.0));
- }
- } else {
- long sum = 0;
- for (DoubleWritable message : messages) {
- sum++;
- }
- LongWritable vertexValue = getValue();
- vertexValue.set(sum);
- setValue(vertexValue);
- voteToHalt();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java
new file mode 100644
index 0000000..96e0403
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityComputation.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * A simple use of the Identity Vertex for taking care of Long, Double,
+ * Double, Double type Inputformat Good for use with
+ * io.LongDoubleDoubleAdjacencyListVertexInputFormat
+ */
+
+public abstract class SimpleLongDoubleDoubleDoubleIdentityComputation extends
+ IdentityComputation<LongWritable, DoubleWritable,
+ DoubleWritable, DoubleWritable> { }
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
deleted file mode 100644
index c7349d1..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleLongDoubleDoubleDoubleIdentityVertex.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * A simple use of the Identity Vertex for taking care of Long, Double,
- * Double, Double type Inputformat Good for use with
- * io.LongDoubleDoubleAdjacencyListVertexInputFormat
- */
-
-public abstract class SimpleLongDoubleDoubleDoubleIdentityVertex extends
- IdentityVertex<LongWritable, DoubleWritable,
- DoubleWritable, DoubleWritable> { }
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java
new file mode 100644
index 0000000..b9b2373
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeComputation.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Demonstrates a computation with a centralized part implemented via a
+ * MasterCompute.
+ */
+public class SimpleMasterComputeComputation extends BasicComputation<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Aggregator to get values from the master to the workers */
+ public static final String SMC_AGG = "simplemastercompute.aggregator";
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleMasterComputeComputation.class);
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ double oldSum = getSuperstep() == 0 ? 0 : vertex.getValue().get();
+ double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
+ double newSum = oldSum + newValue;
+ vertex.setValue(new DoubleWritable(newSum));
+ SimpleMasterComputeWorkerContext workerContext = getWorkerContext();
+ workerContext.setFinalSum(newSum);
+ LOG.info("Current sum: " + newSum);
+ }
+
+ /**
+ * Worker context used with {@link SimpleMasterComputeComputation}.
+ */
+ public static class SimpleMasterComputeWorkerContext
+ extends WorkerContext {
+ /** Final sum value for verification for local jobs */
+ private static double FINAL_SUM;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ }
+
+ @Override
+ public void preSuperstep() {
+ }
+
+ @Override
+ public void postSuperstep() {
+ }
+
+ @Override
+ public void postApplication() {
+ }
+
+ public void setFinalSum(double sum) {
+ FINAL_SUM = sum;
+ }
+
+ public static double getFinalSum() {
+ return FINAL_SUM;
+ }
+ }
+
+ /**
+ * MasterCompute used with {@link SimpleMasterComputeComputation}.
+ */
+ public static class SimpleMasterCompute
+ extends DefaultMasterCompute {
+ @Override
+ public void compute() {
+ setAggregatedValue(SMC_AGG,
+ new DoubleWritable(((double) getSuperstep()) / 2 + 1));
+ if (getSuperstep() == 10) {
+ haltComputation();
+ }
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
deleted file mode 100644
index 8a21ad7..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMasterComputeVertex.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.aggregators.DoubleOverwriteAggregator;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates a computation with a centralized part implemented via a
- * MasterCompute.
- */
-public class SimpleMasterComputeVertex extends Vertex<LongWritable,
- DoubleWritable, FloatWritable, DoubleWritable> {
- /** Aggregator to get values from the master to the workers */
- public static final String SMC_AGG = "simplemastercompute.aggregator";
- /** Logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleMasterComputeVertex.class);
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- double oldSum = getSuperstep() == 0 ? 0 : getValue().get();
- double newValue = this.<DoubleWritable>getAggregatedValue(SMC_AGG).get();
- double newSum = oldSum + newValue;
- setValue(new DoubleWritable(newSum));
- SimpleMasterComputeWorkerContext workerContext =
- (SimpleMasterComputeWorkerContext) getWorkerContext();
- workerContext.setFinalSum(newSum);
- LOG.info("Current sum: " + newSum);
- }
-
- /**
- * Worker context used with {@link SimpleMasterComputeVertex}.
- */
- public static class SimpleMasterComputeWorkerContext
- extends WorkerContext {
- /** Final sum value for verification for local jobs */
- private static double FINAL_SUM;
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- }
-
- @Override
- public void preSuperstep() {
- }
-
- @Override
- public void postSuperstep() {
- }
-
- @Override
- public void postApplication() {
- }
-
- public void setFinalSum(double sum) {
- FINAL_SUM = sum;
- }
-
- public static double getFinalSum() {
- return FINAL_SUM;
- }
- }
-
- /**
- * MasterCompute used with {@link SimpleMasterComputeVertex}.
- */
- public static class SimpleMasterCompute
- extends DefaultMasterCompute {
- @Override
- public void compute() {
- setAggregatedValue(SMC_AGG,
- new DoubleWritable(((double) getSuperstep()) / 2 + 1));
- if (getSuperstep() == 10) {
- haltComputation();
- }
- }
-
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(SMC_AGG, DoubleOverwriteAggregator.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java
new file mode 100644
index 0000000..75e69ab
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgComputation.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Test whether messages can be sent and received by vertices.
+ */
+public class SimpleMsgComputation extends
+ BasicComputation<LongWritable, IntWritable, FloatWritable, IntWritable> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(SimpleMsgComputation.class);
+ @Override
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ if (vertex.getId().equals(new LongWritable(2))) {
+ sendMessage(new LongWritable(1), new IntWritable(101));
+ sendMessage(new LongWritable(1), new IntWritable(102));
+ sendMessage(new LongWritable(1), new IntWritable(103));
+ }
+ if (!vertex.getId().equals(new LongWritable(1))) {
+ vertex.voteToHalt();
+ } else {
+ /* Check the messages */
+ int sum = 0;
+ for (IntWritable message : messages) {
+ sum += message.get();
+ }
+ LOG.info("compute: Received a sum of " + sum +
+ " (will stop on 306)");
+
+ if (sum == 306) {
+ vertex.voteToHalt();
+ }
+ }
+ if (getSuperstep() > 3) {
+ System.err.println("compute: Vertex 1 failed to receive " +
+ "messages in time");
+ vertex.voteToHalt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
deleted file mode 100644
index 024fe9d..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMsgVertex.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Test whether messages can be sent and received by vertices.
- */
-public class SimpleMsgVertex extends
- Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(SimpleMsgVertex.class);
- @Override
- public void compute(Iterable<IntWritable> messages) {
- if (getId().equals(new LongWritable(2))) {
- sendMessage(new LongWritable(1), new IntWritable(101));
- sendMessage(new LongWritable(1), new IntWritable(102));
- sendMessage(new LongWritable(1), new IntWritable(103));
- }
- if (!getId().equals(new LongWritable(1))) {
- voteToHalt();
- } else {
- /* Check the messages */
- int sum = 0;
- for (IntWritable message : messages) {
- sum += message.get();
- }
- LOG.info("compute: Received a sum of " + sum +
- " (will stop on 306)");
-
- if (sum == 306) {
- voteToHalt();
- }
- }
- if (getSuperstep() > 3) {
- System.err.println("compute: Vertex 1 failed to receive " +
- "messages in time");
- voteToHalt();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
new file mode 100644
index 0000000..fc3b6ad
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphComputation.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Vertex to allow unit testing of graph mutations.
+ */
+public class SimpleMutateGraphComputation extends BasicComputation<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Class logger */
+ private static Logger LOG =
+ Logger.getLogger(SimpleMutateGraphComputation.class);
+ /** Maximum number of ranges for vertex ids */
+ private long maxRanges = 100;
+
+
+ /**
+ * Unless we create a ridiculous number of vertices , we should not
+ * collide within a vertex range defined by this method.
+ *
+ * @param range Range index
+ * @return Starting vertex id of the range
+ */
+ private long rangeVertexIdStart(int range) {
+ return (Long.MAX_VALUE / maxRanges) * range;
+ }
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ SimpleMutateGraphVertexWorkerContext workerContext = getWorkerContext();
+ if (getSuperstep() == 0) {
+ LOG.debug("Reached superstep " + getSuperstep());
+ } else if (getSuperstep() == 1) {
+ // Send messages to vertices that are sure not to exist
+ // (creating them)
+ LongWritable destVertexId =
+ new LongWritable(rangeVertexIdStart(1) + vertex.getId().get());
+ sendMessage(destVertexId, new DoubleWritable(0.0));
+ } else if (getSuperstep() == 2) {
+ LOG.debug("Reached superstep " + getSuperstep());
+ } else if (getSuperstep() == 3) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount * 2 != getTotalNumVertices()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getTotalNumVertices() +
+ " vertices when should have " + vertexCount * 2 +
+ " on superstep " + getSuperstep());
+ }
+ long edgeCount = workerContext.getEdgeCount();
+ if (edgeCount != getTotalNumEdges()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getTotalNumEdges() +
+ " edges when should have " + edgeCount +
+ " on superstep " + getSuperstep());
+ }
+ // Create vertices that are sure not to exist (doubling vertices)
+ LongWritable vertexIndex =
+ new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
+ addVertexRequest(vertexIndex, new DoubleWritable(0.0));
+ // Add edges to those remote vertices as well
+ addEdgeRequest(vertexIndex,
+ EdgeFactory.create(vertex.getId(), new FloatWritable(0.0f)));
+ } else if (getSuperstep() == 4) {
+ LOG.debug("Reached superstep " + getSuperstep());
+ } else if (getSuperstep() == 5) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount * 2 != getTotalNumVertices()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getTotalNumVertices() +
+ " when should have " + vertexCount * 2 +
+ " on superstep " + getSuperstep());
+ }
+ long edgeCount = workerContext.getEdgeCount();
+ if (edgeCount + vertexCount != getTotalNumEdges()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getTotalNumEdges() +
+ " edges when should have " + edgeCount + vertexCount +
+ " on superstep " + getSuperstep());
+ }
+ // Remove the edges created in superstep 3
+ LongWritable vertexIndex =
+ new LongWritable(rangeVertexIdStart(3) + vertex.getId().get());
+ workerContext.increaseEdgesRemoved();
+ removeEdgesRequest(vertexIndex, vertex.getId());
+ } else if (getSuperstep() == 6) {
+ // Remove all the vertices created in superstep 3
+ if (vertex.getId().compareTo(
+ new LongWritable(rangeVertexIdStart(3))) >= 0) {
+ removeVertexRequest(vertex.getId());
+ }
+ } else if (getSuperstep() == 7) {
+ long origEdgeCount = workerContext.getOrigEdgeCount();
+ if (origEdgeCount != getTotalNumEdges()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getTotalNumEdges() +
+ " edges when should have " + origEdgeCount +
+ " on superstep " + getSuperstep());
+ }
+ } else if (getSuperstep() == 8) {
+ long vertexCount = workerContext.getVertexCount();
+ if (vertexCount / 2 != getTotalNumVertices()) {
+ throw new IllegalStateException(
+ "Impossible to have " + getTotalNumVertices() +
+ " vertices when should have " + vertexCount / 2 +
+ " on superstep " + getSuperstep());
+ }
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+
+ /**
+ * Worker context used with {@link SimpleMutateGraphComputation}.
+ */
+ public static class SimpleMutateGraphVertexWorkerContext
+ extends WorkerContext {
+ /** Cached vertex count */
+ private long vertexCount;
+ /** Cached edge count */
+ private long edgeCount;
+ /** Original number of edges */
+ private long origEdgeCount;
+ /** Number of edges removed during superstep */
+ private int edgesRemoved = 0;
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException { }
+
+ @Override
+ public void postApplication() { }
+
+ @Override
+ public void preSuperstep() { }
+
+ @Override
+ public void postSuperstep() {
+ vertexCount = getTotalNumVertices();
+ edgeCount = getTotalNumEdges();
+ if (getSuperstep() == 1) {
+ origEdgeCount = edgeCount;
+ }
+ LOG.info("Got " + vertexCount + " vertices, " +
+ edgeCount + " edges on superstep " +
+ getSuperstep());
+ LOG.info("Removed " + edgesRemoved);
+ edgesRemoved = 0;
+ }
+
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ public long getOrigEdgeCount() {
+ return origEdgeCount;
+ }
+
+ /**
+ * Increase the number of edges removed by one.
+ */
+ public void increaseEdgesRemoved() {
+ this.edgesRemoved++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
deleted file mode 100644
index a468491..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-
-/**
- * Vertex to allow unit testing of graph mutations.
- */
-public class SimpleMutateGraphVertex extends Vertex<
- LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
- /** Class logger */
- private static Logger LOG =
- Logger.getLogger(SimpleMutateGraphVertex.class);
- /** Maximum number of ranges for vertex ids */
- private long maxRanges = 100;
-
-
- /**
- * Unless we create a ridiculous number of vertices , we should not
- * collide within a vertex range defined by this method.
- *
- * @param range Range index
- * @return Starting vertex id of the range
- */
- private long rangeVertexIdStart(int range) {
- return (Long.MAX_VALUE / maxRanges) * range;
- }
-
- @Override
- public void compute(Iterable<DoubleWritable> messages)
- throws IOException {
- SimpleMutateGraphVertexWorkerContext workerContext =
- (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
- if (getSuperstep() == 0) {
- LOG.debug("Reached superstep " + getSuperstep());
- } else if (getSuperstep() == 1) {
- // Send messages to vertices that are sure not to exist
- // (creating them)
- LongWritable destVertexId =
- new LongWritable(rangeVertexIdStart(1) + getId().get());
- sendMessage(destVertexId, new DoubleWritable(0.0));
- } else if (getSuperstep() == 2) {
- LOG.debug("Reached superstep " + getSuperstep());
- } else if (getSuperstep() == 3) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getTotalNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumVertices() +
- " vertices when should have " + vertexCount * 2 +
- " on superstep " + getSuperstep());
- }
- long edgeCount = workerContext.getEdgeCount();
- if (edgeCount != getTotalNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumEdges() +
- " edges when should have " + edgeCount +
- " on superstep " + getSuperstep());
- }
- // Create vertices that are sure not to exist (doubling vertices)
- LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getId().get());
- addVertexRequest(vertexIndex, new DoubleWritable(0.0));
- // Add edges to those remote vertices as well
- addEdgeRequest(vertexIndex,
- EdgeFactory.create(getId(), new FloatWritable(0.0f)));
- } else if (getSuperstep() == 4) {
- LOG.debug("Reached superstep " + getSuperstep());
- } else if (getSuperstep() == 5) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getTotalNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumVertices() +
- " when should have " + vertexCount * 2 +
- " on superstep " + getSuperstep());
- }
- long edgeCount = workerContext.getEdgeCount();
- if (edgeCount + vertexCount != getTotalNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumEdges() +
- " edges when should have " + edgeCount + vertexCount +
- " on superstep " + getSuperstep());
- }
- // Remove the edges created in superstep 3
- LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getId().get());
- workerContext.increaseEdgesRemoved();
- removeEdgesRequest(vertexIndex, getId());
- } else if (getSuperstep() == 6) {
- // Remove all the vertices created in superstep 3
- if (getId().compareTo(
- new LongWritable(rangeVertexIdStart(3))) >= 0) {
- removeVertexRequest(getId());
- }
- } else if (getSuperstep() == 7) {
- long origEdgeCount = workerContext.getOrigEdgeCount();
- if (origEdgeCount != getTotalNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumEdges() +
- " edges when should have " + origEdgeCount +
- " on superstep " + getSuperstep());
- }
- } else if (getSuperstep() == 8) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount / 2 != getTotalNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumVertices() +
- " vertices when should have " + vertexCount / 2 +
- " on superstep " + getSuperstep());
- }
- } else {
- voteToHalt();
- }
- }
-
- /**
- * Worker context used with {@link SimpleMutateGraphVertex}.
- */
- public static class SimpleMutateGraphVertexWorkerContext
- extends WorkerContext {
- /** Cached vertex count */
- private long vertexCount;
- /** Cached edge count */
- private long edgeCount;
- /** Original number of edges */
- private long origEdgeCount;
- /** Number of edges removed during superstep */
- private int edgesRemoved = 0;
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException { }
-
- @Override
- public void postApplication() { }
-
- @Override
- public void preSuperstep() { }
-
- @Override
- public void postSuperstep() {
- vertexCount = getTotalNumVertices();
- edgeCount = getTotalNumEdges();
- if (getSuperstep() == 1) {
- origEdgeCount = edgeCount;
- }
- LOG.info("Got " + vertexCount + " vertices, " +
- edgeCount + " edges on superstep " +
- getSuperstep());
- LOG.info("Removed " + edgesRemoved);
- edgesRemoved = 0;
- }
-
- public long getVertexCount() {
- return vertexCount;
- }
-
- public long getEdgeCount() {
- return edgeCount;
- }
-
- public long getOrigEdgeCount() {
- return origEdgeCount;
- }
-
- /**
- * Increase the number of edges removed by one.
- */
- public void increaseEdgesRemoved() {
- this.edgesRemoved++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java
new file mode 100644
index 0000000..36dc727
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountComputation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.giraph.graph.Vertex;
+
+import java.io.IOException;
+
+/**
+ * Simple function to return the out degree for each vertex.
+ */
+@Algorithm(
+ name = "Outdegree Count"
+)
+public class SimpleOutDegreeCountComputation extends BasicComputation<
+ LongWritable, LongWritable, DoubleWritable, DoubleWritable> {
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, LongWritable, DoubleWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ LongWritable vertexValue = vertex.getValue();
+ vertexValue.set(vertex.getNumEdges());
+ vertex.setValue(vertexValue);
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
deleted file mode 100644
index c830fa2..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.graph.Vertex;
-
-
-/**
- * Simple function to return the out degree for each vertex.
- */
-@Algorithm(
- name = "Outdegree Count"
-)
-public class SimpleOutDegreeCountVertex extends Vertex<
- LongWritable, LongWritable,
- DoubleWritable, DoubleWritable> {
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- LongWritable vertexValue = getValue();
- vertexValue.set(getNumEdges());
- setValue(vertexValue);
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java
new file mode 100644
index 0000000..d053bb3
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankComputation.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import org.apache.giraph.aggregators.DoubleMaxAggregator;
+import org.apache.giraph.aggregators.DoubleMinAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * Demonstrates the basic Pregel PageRank implementation.
+ */
+@Algorithm(
+ name = "Page rank"
+)
+public class SimplePageRankComputation extends BasicComputation<LongWritable,
+ DoubleWritable, FloatWritable, DoubleWritable> {
+ /** Number of supersteps for this test */
+ public static final int MAX_SUPERSTEPS = 30;
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimplePageRankComputation.class);
+ /** Sum aggregator name */
+ private static String SUM_AGG = "sum";
+ /** Min aggregator name */
+ private static String MIN_AGG = "min";
+ /** Max aggregator name */
+ private static String MAX_AGG = "max";
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ for (DoubleWritable message : messages) {
+ sum += message.get();
+ }
+ DoubleWritable vertexValue =
+ new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
+ vertex.setValue(vertexValue);
+ aggregate(MAX_AGG, vertexValue);
+ aggregate(MIN_AGG, vertexValue);
+ aggregate(SUM_AGG, new LongWritable(1));
+ LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
+ " max=" + getAggregatedValue(MAX_AGG) +
+ " min=" + getAggregatedValue(MIN_AGG));
+ }
+
+ if (getSuperstep() < MAX_SUPERSTEPS) {
+ long edges = vertex.getNumEdges();
+ sendMessageToAllEdges(vertex,
+ new DoubleWritable(vertex.getValue().get() / edges));
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+
+ /**
+ * Worker context used with {@link SimplePageRankComputation}.
+ */
+ public static class SimplePageRankWorkerContext extends
+ WorkerContext {
+ /** Final max value for verification for local jobs */
+ private static double FINAL_MAX;
+ /** Final min value for verification for local jobs */
+ private static double FINAL_MIN;
+ /** Final sum value for verification for local jobs */
+ private static long FINAL_SUM;
+
+ public static double getFinalMax() {
+ return FINAL_MAX;
+ }
+
+ public static double getFinalMin() {
+ return FINAL_MIN;
+ }
+
+ public static long getFinalSum() {
+ return FINAL_SUM;
+ }
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ }
+
+ @Override
+ public void postApplication() {
+ FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
+ FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
+ FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
+
+ LOG.info("aggregatedNumVertices=" + FINAL_SUM);
+ LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
+ LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
+ }
+
+ @Override
+ public void preSuperstep() {
+ if (getSuperstep() >= 3) {
+ LOG.info("aggregatedNumVertices=" +
+ getAggregatedValue(SUM_AGG) +
+ " NumVertices=" + getTotalNumVertices());
+ if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
+ getTotalNumVertices()) {
+ throw new RuntimeException("wrong value of SumAggreg: " +
+ getAggregatedValue(SUM_AGG) + ", should be: " +
+ getTotalNumVertices());
+ }
+ DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
+ LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+ DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
+ LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+ }
+ }
+
+ @Override
+ public void postSuperstep() { }
+ }
+
+ /**
+ * Master compute associated with {@link SimplePageRankComputation}.
+ * It registers required aggregators.
+ */
+ public static class SimplePageRankMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(SUM_AGG, LongSumAggregator.class);
+ registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
+ registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
+ }
+ }
+
+ /**
+ * Simple VertexReader that supports {@link SimplePageRankComputation}
+ */
+ public static class SimplePageRankVertexReader extends
+ GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimplePageRankVertexReader.class);
+
+ @Override
+ public boolean nextVertex() {
+ return totalRecords > recordsRead;
+ }
+
+ @Override
+ public Vertex<LongWritable, DoubleWritable,
+ FloatWritable> getCurrentVertex() throws IOException {
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex =
+ getConf().createVertex();
+ LongWritable vertexId = new LongWritable(
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
+ long targetVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
+ edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+ new FloatWritable(edgeValue)));
+ vertex.initialize(vertexId, vertexValue, edges);
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next: Return vertexId=" + vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
+ }
+ return vertex;
+ }
+ }
+
+ /**
+ * Simple VertexInputFormat that supports {@link SimplePageRankComputation}
+ */
+ public static class SimplePageRankVertexInputFormat extends
+ GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
+ @Override
+ public VertexReader<LongWritable, DoubleWritable,
+ FloatWritable> createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new SimplePageRankVertexReader();
+ }
+ }
+
+ /**
+ * Simple VertexOutputFormat that supports {@link SimplePageRankComputation}
+ */
+ public static class SimplePageRankVertexOutputFormat extends
+ TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SimplePageRankVertexWriter();
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimplePageRankComputation}
+ */
+ public class SimplePageRankVertexWriter extends TextVertexWriter {
+ @Override
+ public void writeVertex(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex)
+ throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
+ }
+ }
+ }
+}