You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:26:59 UTC
[03/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
deleted file mode 100644
index 7a63e8d..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import org.apache.giraph.aggregators.DoubleMaxAggregator;
-import org.apache.giraph.aggregators.DoubleMinAggregator;
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates the basic Pregel PageRank implementation.
- */
-@Algorithm(
- name = "Page rank"
-)
-public class SimplePageRankVertex extends Vertex<LongWritable,
- DoubleWritable, FloatWritable, DoubleWritable> {
- /** Number of supersteps for this test */
- public static final int MAX_SUPERSTEPS = 30;
- /** Logger */
- private static final Logger LOG =
- Logger.getLogger(SimplePageRankVertex.class);
- /** Sum aggregator name */
- private static String SUM_AGG = "sum";
- /** Min aggregator name */
- private static String MIN_AGG = "min";
- /** Max aggregator name */
- private static String MAX_AGG = "max";
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- if (getSuperstep() >= 1) {
- double sum = 0;
- for (DoubleWritable message : messages) {
- sum += message.get();
- }
- DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
- setValue(vertexValue);
- aggregate(MAX_AGG, vertexValue);
- aggregate(MIN_AGG, vertexValue);
- aggregate(SUM_AGG, new LongWritable(1));
- LOG.info(getId() + ": PageRank=" + vertexValue +
- " max=" + getAggregatedValue(MAX_AGG) +
- " min=" + getAggregatedValue(MIN_AGG));
- }
-
- if (getSuperstep() < MAX_SUPERSTEPS) {
- long edges = getNumEdges();
- sendMessageToAllEdges(
- new DoubleWritable(getValue().get() / edges));
- } else {
- voteToHalt();
- }
- }
-
- /**
- * Worker context used with {@link SimplePageRankVertex}.
- */
- public static class SimplePageRankVertexWorkerContext extends
- WorkerContext {
- /** Final max value for verification for local jobs */
- private static double FINAL_MAX;
- /** Final min value for verification for local jobs */
- private static double FINAL_MIN;
- /** Final sum value for verification for local jobs */
- private static long FINAL_SUM;
-
- public static double getFinalMax() {
- return FINAL_MAX;
- }
-
- public static double getFinalMin() {
- return FINAL_MIN;
- }
-
- public static long getFinalSum() {
- return FINAL_SUM;
- }
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- }
-
- @Override
- public void postApplication() {
- FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
- FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
- FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
-
- LOG.info("aggregatedNumVertices=" + FINAL_SUM);
- LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
- LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
- }
-
- @Override
- public void preSuperstep() {
- if (getSuperstep() >= 3) {
- LOG.info("aggregatedNumVertices=" +
- getAggregatedValue(SUM_AGG) +
- " NumVertices=" + getTotalNumVertices());
- if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
- getTotalNumVertices()) {
- throw new RuntimeException("wrong value of SumAggreg: " +
- getAggregatedValue(SUM_AGG) + ", should be: " +
- getTotalNumVertices());
- }
- DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
- LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
- DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
- LOG.info("aggregatedMinPageRank=" + minPagerank.get());
- }
- }
-
- @Override
- public void postSuperstep() { }
- }
-
- /**
- * Master compute associated with {@link SimplePageRankVertex}.
- * It registers required aggregators.
- */
- public static class SimplePageRankVertexMasterCompute extends
- DefaultMasterCompute {
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(SUM_AGG, LongSumAggregator.class);
- registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
- registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
- }
- }
-
- /**
- * Simple VertexReader that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexReader extends
- GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimplePageRankVertexReader.class);
-
- @Override
- public boolean nextVertex() {
- return totalRecords > recordsRead;
- }
-
- @Override
- public Vertex<LongWritable, DoubleWritable,
- FloatWritable, Writable> getCurrentVertex() throws IOException {
- Vertex<LongWritable, DoubleWritable, FloatWritable, Writable>
- vertex = getConf().createVertex();
- LongWritable vertexId = new LongWritable(
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
- DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
- long targetVertexId =
- (vertexId.get() + 1) %
- (inputSplit.getNumSplits() * totalRecords);
- float edgeValue = vertexId.get() * 100f;
- List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
- edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
- new FloatWritable(edgeValue)));
- vertex.initialize(vertexId, vertexValue, edges);
- ++recordsRead;
- if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getId().get() +
- ", vertexValue=" + vertex.getValue() +
- ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
- }
- return vertex;
- }
- }
-
- /**
- * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexInputFormat extends
- GeneratedVertexInputFormat<LongWritable, DoubleWritable, FloatWritable> {
- @Override
- public VertexReader<LongWritable, DoubleWritable,
- FloatWritable> createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
- return new SimplePageRankVertexReader();
- }
- }
-
- /**
- * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new SimplePageRankVertexWriter();
- }
-
- /**
- * Simple VertexWriter that supports {@link SimplePageRankVertex}
- */
- public class SimplePageRankVertexWriter extends TextVertexWriter {
- @Override
- public void writeVertex(
- Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java
new file mode 100644
index 0000000..bc39cad
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsComputation.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.conf.LongConfOption;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * Demonstrates the basic Pregel shortest paths implementation.
+ */
+@Algorithm(
+ name = "Shortest paths",
+ description = "Finds all shortest paths from a selected vertex"
+)
+public class SimpleShortestPathsComputation extends BasicComputation<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** The shortest paths id */
+ public static final LongConfOption SOURCE_ID =
+ new LongConfOption("SimpleShortestPathsVertex.sourceId", 1);
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleShortestPathsComputation.class);
+
+ /**
+ * Is this vertex the source id?
+ *
+ * @param vertex Vertex
+ * @return True if the source id
+ */
+ private boolean isSource(Vertex<LongWritable, ?, ?> vertex) {
+ return vertex.getId().get() == SOURCE_ID.get(getConf());
+ }
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ if (getSuperstep() == 0) {
+ vertex.setValue(new DoubleWritable(Double.MAX_VALUE));
+ }
+ double minDist = isSource(vertex) ? 0d : Double.MAX_VALUE;
+ for (DoubleWritable message : messages) {
+ minDist = Math.min(minDist, message.get());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex " + vertex.getId() + " got minDist = " + minDist +
+ " vertex value = " + vertex.getValue());
+ }
+ if (minDist < vertex.getValue().get()) {
+ vertex.setValue(new DoubleWritable(minDist));
+ for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+ double distance = minDist + edge.getValue().get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex " + vertex.getId() + " sent to " +
+ edge.getTargetVertexId() + " = " + distance);
+ }
+ sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
+ }
+ }
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
deleted file mode 100644
index 13d1d7c..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.conf.LongConfOption;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates the basic Pregel shortest paths implementation.
- */
-@Algorithm(
- name = "Shortest paths",
- description = "Finds all shortest paths from a selected vertex"
-)
-public class SimpleShortestPathsVertex extends
- Vertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
- /** The shortest paths id */
- public static final LongConfOption SOURCE_ID =
- new LongConfOption("SimpleShortestPathsVertex.sourceId", 1);
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleShortestPathsVertex.class);
-
- /**
- * Is this vertex the source id?
- *
- * @return True if the source id
- */
- private boolean isSource() {
- return getId().get() == SOURCE_ID.get(getConf());
- }
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- if (getSuperstep() == 0) {
- setValue(new DoubleWritable(Double.MAX_VALUE));
- }
- double minDist = isSource() ? 0d : Double.MAX_VALUE;
- for (DoubleWritable message : messages) {
- minDist = Math.min(minDist, message.get());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getId() + " got minDist = " + minDist +
- " vertex value = " + getValue());
- }
- if (minDist < getValue().get()) {
- setValue(new DoubleWritable(minDist));
- for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
- double distance = minDist + edge.getValue().get();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getId() + " sent to " +
- edge.getTargetVertexId() + " = " + distance);
- }
- sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
- }
- }
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java
new file mode 100644
index 0000000..c3fd215
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepComputation.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.io.formats.TextVertexOutputFormat;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+/**
+ * Just a simple Vertex compute implementation that executes 3 supersteps, then
+ * finishes.
+ */
+public class SimpleSuperstepComputation extends BasicComputation<LongWritable,
+ IntWritable, FloatWritable, IntWritable> {
+ @Override
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ // Some checks for additional testing
+ if (getTotalNumVertices() < 1) {
+ throw new IllegalStateException("compute: Illegal total vertices " +
+ getTotalNumVertices());
+ }
+ if (getTotalNumEdges() < 0) {
+ throw new IllegalStateException("compute: Illegal total edges " +
+ getTotalNumEdges());
+ }
+ if (vertex.isHalted()) {
+ throw new IllegalStateException("compute: Impossible to be halted - " +
+ vertex.isHalted());
+ }
+
+ if (getSuperstep() > 3) {
+ vertex.voteToHalt();
+ }
+ }
+
+ /**
+ * Simple VertexReader that supports {@link SimpleSuperstepComputation}
+ */
+ public static class SimpleSuperstepVertexReader extends
+ GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleSuperstepVertexReader.class);
+
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return totalRecords > recordsRead;
+ }
+
+ @Override
+ public Vertex<LongWritable, IntWritable, FloatWritable> getCurrentVertex()
+ throws IOException, InterruptedException {
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex =
+ getConf().createVertex();
+ long tmpId = reverseIdOrder ?
+ ((inputSplit.getSplitIndex() + 1) * totalRecords) -
+ recordsRead - 1 :
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
+ LongWritable vertexId = new LongWritable(tmpId);
+ IntWritable vertexValue =
+ new IntWritable((int) (vertexId.get() * 10));
+ List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
+ long targetVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+ new FloatWritable(edgeValue)));
+ vertex.initialize(vertexId, vertexValue, edges);
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next: Return vertexId=" + vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", targetVertexId=" + targetVertexId +
+ ", edgeValue=" + edgeValue);
+ }
+ return vertex;
+ }
+ }
+
+ /**
+ * Simple VertexInputFormat that supports {@link SimpleSuperstepComputation}
+ */
+ public static class SimpleSuperstepVertexInputFormat extends
+ GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
+ @Override
+ public VertexReader<LongWritable, IntWritable, FloatWritable>
+ createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new SimpleSuperstepVertexReader();
+ }
+ }
+
+
+ /**
+ * Simple VertexOutputFormat that supports {@link SimpleSuperstepComputation}
+ */
+ public static class SimpleSuperstepVertexOutputFormat extends
+ TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
+ @Override
+ public TextVertexWriter createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SimpleSuperstepVertexWriter();
+ }
+
+ /**
+ * Simple VertexWriter that supports {@link SimpleSuperstepComputation}
+ */
+ public class SimpleSuperstepVertexWriter extends TextVertexWriter {
+ @Override
+ public void writeVertex(Vertex<LongWritable, IntWritable,
+ FloatWritable> vertex) throws IOException, InterruptedException {
+ getRecordWriter().write(
+ new Text(vertex.getId().toString()),
+ new Text(vertex.getValue().toString()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
deleted file mode 100644
index 6f8b352..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import com.google.common.collect.Lists;
-import java.io.IOException;
-import java.util.List;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.edge.EdgeFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-/**
- * Just a simple Vertex compute implementation that executes 3 supersteps, then
- * finishes.
- */
-public class SimpleSuperstepVertex extends Vertex<LongWritable, IntWritable,
- FloatWritable, IntWritable> {
- @Override
- public void compute(Iterable<IntWritable> messages) {
- // Some checks for additional testing
- if (getTotalNumVertices() < 1) {
- throw new IllegalStateException("compute: Illegal total vertices " +
- getTotalNumVertices());
- }
- if (getTotalNumEdges() < 0) {
- throw new IllegalStateException("compute: Illegal total edges " +
- getTotalNumEdges());
- }
- if (isHalted()) {
- throw new IllegalStateException("compute: Impossible to be halted - " +
- isHalted());
- }
-
- if (getSuperstep() > 3) {
- voteToHalt();
- }
- }
-
- /**
- * Simple VertexReader that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexReader extends
- GeneratedVertexReader<LongWritable, IntWritable, FloatWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleSuperstepVertexReader.class);
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return totalRecords > recordsRead;
- }
-
- @Override
- public Vertex<LongWritable, IntWritable, FloatWritable,
- Writable> getCurrentVertex()
- throws IOException, InterruptedException {
- Vertex<LongWritable, IntWritable, FloatWritable, Writable> vertex =
- getConf().createVertex();
- long tmpId = reverseIdOrder ?
- ((inputSplit.getSplitIndex() + 1) * totalRecords) -
- recordsRead - 1 :
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
- LongWritable vertexId = new LongWritable(tmpId);
- IntWritable vertexValue =
- new IntWritable((int) (vertexId.get() * 10));
- List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
- long targetVertexId =
- (vertexId.get() + 1) %
- (inputSplit.getNumSplits() * totalRecords);
- float edgeValue = vertexId.get() * 100f;
- edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
- new FloatWritable(edgeValue)));
- vertex.initialize(vertexId, vertexValue, edges);
- ++recordsRead;
- if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getId().get() +
- ", vertexValue=" + vertex.getValue() +
- ", targetVertexId=" + targetVertexId +
- ", edgeValue=" + edgeValue);
- }
- return vertex;
- }
- }
-
- /**
- * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexInputFormat extends
- GeneratedVertexInputFormat<LongWritable, IntWritable, FloatWritable> {
- @Override
- public VertexReader<LongWritable, IntWritable, FloatWritable>
- createVertexReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- return new SimpleSuperstepVertexReader();
- }
- }
-
-
- /**
- * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new SimpleSuperstepVertexWriter();
- }
-
- /**
- * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
- */
- public class SimpleSuperstepVertexWriter extends TextVertexWriter {
- @Override
- public void writeVertex(Vertex<LongWritable, IntWritable,
- FloatWritable, ?> vertex) throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
index 157e6ef..b1ee2a0 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
@@ -39,7 +39,7 @@ public class SimpleTextVertexOutputFormat extends
private class SimpleTextVertexWriter extends TextVertexWriter {
@Override
public void writeVertex(
- Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex)
throws IOException, InterruptedException {
getRecordWriter().write(
new Text(vertex.getId().toString()),
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java
new file mode 100644
index 0000000..8608d02
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingComputation.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.utils.ArrayListWritable;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Demonstrates triangle closing in simple,
+ * unweighted graphs for Giraph.
+ *
+ * Triangle Closing: Vertex A and B maintain out-edges to C and D
+ * The algorithm, when finished, populates all vertices' value with an
+ * array of Writables representing all the vertices that each
+ * should form an out-edge to (connect with, if this is a social
+ * graph.)
+ * In this example, vertices A and B would hold empty arrays
+ * since they are already connected with C and D. Results:
+ * If the graph is undirected, C would hold value, D and D would
+ * hold value C, since both are neighbors of A and B and yet both
+ * were not previously connected to each other.
+ *
+ * In a social graph, the result values for vertex X would represent people
+ * that are likely a part of a person X's social circle (they know one or more
+ * people X is connected to already) but X had not previously met them yet.
+ * Given this new information, X can decide to connect to vertices (peoople) in
+ * the result array or not.
+ *
+ * Results at each vertex are ordered in terms of the # of neighbors
+ * who are connected to each vertex listed in the final vertex value.
+ * The more of a vertex's neighbors who "know" someone, the stronger
+ * your social relationship is presumed to be to that vertex (assuming
+ * a social graph) and the more likely you should connect with them.
+ *
+ * In this implementation, Edge Values are not used, but could be
+ * adapted to represent additional qualities that could affect the
+ * ordering of the final result array.
+ */
+public class SimpleTriangleClosingComputation extends BasicComputation<
+ IntWritable, SimpleTriangleClosingComputation.IntArrayListWritable,
+ NullWritable, IntWritable> {
+ /** Vertices to close the triangle, ranked by frequency of in-msgs */
+ private Map<IntWritable, Integer> closeMap =
+ Maps.<IntWritable, Integer>newHashMap();
+
+ @Override
+ public void compute(
+ Vertex<IntWritable, IntArrayListWritable, NullWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ if (getSuperstep() == 0) {
+ // send list of this vertex's neighbors to all neighbors
+ for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
+ sendMessageToAllEdges(vertex, edge.getTargetVertexId());
+ }
+ } else {
+ for (IntWritable message : messages) {
+ final int current = (closeMap.get(message) == null) ?
+ 0 : closeMap.get(message) + 1;
+ closeMap.put(message, current);
+ }
+ // make sure the result values are sorted and
+ // packaged in an IntArrayListWritable for output
+ Set<Pair> sortedResults = Sets.<Pair>newTreeSet();
+ for (Map.Entry<IntWritable, Integer> entry : closeMap.entrySet()) {
+ sortedResults.add(new Pair(entry.getKey(), entry.getValue()));
+ }
+ IntArrayListWritable
+ outputList = new IntArrayListWritable();
+ for (Pair pair : sortedResults) {
+ if (pair.value > 0) {
+ outputList.add(pair.key);
+ } else {
+ break;
+ }
+ }
+ vertex.setValue(outputList);
+ }
+ vertex.voteToHalt();
+ }
+
+ /** Quick, immutable K,V storage for sorting in tree set */
+ public static class Pair implements Comparable<Pair> {
+ /** key
+ * @param key the IntWritable key */
+ private final IntWritable key;
+ /** value
+ * @param value the Integer value */
+ private final Integer value;
+ /** Constructor
+ * @param k the key
+ * @param v the value
+ */
+ public Pair(IntWritable k, Integer v) {
+ key = k;
+ value = v;
+ }
+ /** key getter
+ * @return the key */
+ public IntWritable getKey() { return key; }
+ /** value getter
+ * @return the value */
+ public Integer getValue() { return value; }
+ /** Comparator to quickly sort by values
+ * @param other the Pair to compare with THIS
+ * @return the comparison value as an integer */
+ @Override
+ public int compareTo(Pair other) {
+ return other.value - this.value;
+ }
+ }
+
+ /** Utility class for delivering the array of vertices THIS vertex
+ * should connect with to close triangles with neighbors */
+ public static class IntArrayListWritable
+ extends ArrayListWritable<IntWritable> {
+ /** Default constructor for reflection */
+ public IntArrayListWritable() {
+ super();
+ }
+ /** Set storage type for this ArrayListWritable */
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setClass() {
+ setClass(IntWritable.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
deleted file mode 100644
index f44cb18..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.utils.ArrayListWritable;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Demonstrates triangle closing in simple,
- * unweighted graphs for Giraph.
- *
- * Triangle Closing: Vertex A and B maintain out-edges to C and D
- * The algorithm, when finished, populates all vertices' value with an
- * array of Writables representing all the vertices that each
- * should form an out-edge to (connect with, if this is a social
- * graph.)
- * In this example, vertices A and B would hold empty arrays
- * since they are already connected with C and D. Results:
- * If the graph is undirected, C would hold value, D and D would
- * hold value C, since both are neighbors of A and B and yet both
- * were not previously connected to each other.
- *
- * In a social graph, the result values for vertex X would represent people
- * that are likely a part of a person X's social circle (they know one or more
- * people X is connected to already) but X had not previously met them yet.
- * Given this new information, X can decide to connect to vertices (peoople) in
- * the result array or not.
- *
- * Results at each vertex are ordered in terms of the # of neighbors
- * who are connected to each vertex listed in the final vertex value.
- * The more of a vertex's neighbors who "know" someone, the stronger
- * your social relationship is presumed to be to that vertex (assuming
- * a social graph) and the more likely you should connect with them.
- *
- * In this implementation, Edge Values are not used, but could be
- * adapted to represent additional qualities that could affect the
- * ordering of the final result array.
- */
-public class SimpleTriangleClosingVertex extends Vertex<
- IntWritable, SimpleTriangleClosingVertex.IntArrayListWritable,
- NullWritable, IntWritable> {
- /** Vertices to close the triangle, ranked by frequency of in-msgs */
- private Map<IntWritable, Integer> closeMap =
- Maps.<IntWritable, Integer>newHashMap();
-
- @Override
- public void compute(Iterable<IntWritable> messages) {
- if (getSuperstep() == 0) {
- // send list of this vertex's neighbors to all neighbors
- for (Edge<IntWritable, NullWritable> edge : getEdges()) {
- sendMessageToAllEdges(edge.getTargetVertexId());
- }
- } else {
- for (IntWritable message : messages) {
- final int current = (closeMap.get(message) == null) ?
- 0 : closeMap.get(message) + 1;
- closeMap.put(message, current);
- }
- // make sure the result values are sorted and
- // packaged in an IntArrayListWritable for output
- Set<SimpleTriangleClosingVertex.Pair> sortedResults =
- Sets.<SimpleTriangleClosingVertex.Pair>newTreeSet();
- for (Map.Entry<IntWritable, Integer> entry : closeMap.entrySet()) {
- sortedResults.add(new Pair(entry.getKey(), entry.getValue()));
- }
- SimpleTriangleClosingVertex.IntArrayListWritable
- outputList = new SimpleTriangleClosingVertex.IntArrayListWritable();
- for (SimpleTriangleClosingVertex.Pair pair : sortedResults) {
- if (pair.value > 0) {
- outputList.add(pair.key);
- } else {
- break;
- }
- }
- setValue(outputList);
- }
- voteToHalt();
- }
-
- /** Quick, immutable K,V storage for sorting in tree set */
- public static class Pair implements Comparable<Pair> {
- /** key
- * @param key the IntWritable key */
- private final IntWritable key;
- /** value
- * @param value the Integer value */
- private final Integer value;
- /** Constructor
- * @param k the key
- * @param v the value
- */
- public Pair(IntWritable k, Integer v) {
- key = k;
- value = v;
- }
- /** key getter
- * @return the key */
- public IntWritable getKey() { return key; }
- /** value getter
- * @return the value */
- public Integer getValue() { return value; }
- /** Comparator to quickly sort by values
- * @param other the Pair to compare with THIS
- * @return the comparison value as an integer */
- @Override
- public int compareTo(Pair other) {
- return other.value - this.value;
- }
- }
-
- /** Utility class for delivering the array of vertices THIS vertex
- * should connect with to close triangles with neighbors */
- public static class IntArrayListWritable
- extends ArrayListWritable<IntWritable> {
- /** Default constructor for reflection */
- public IntArrayListWritable() {
- super();
- }
- /** Set storage type for this ArrayListWritable */
- @Override
- @SuppressWarnings("unchecked")
- public void setClass() {
- setClass(IntWritable.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
index 8a6f775..a5051b8 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
@@ -18,8 +18,8 @@
package org.apache.giraph.examples;
-import org.apache.giraph.examples.SimpleSuperstepVertex.
- SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.worker.WorkerContext;
@@ -63,21 +63,21 @@ public class SimpleVertexWithWorkerContext implements Tool {
/**
* Actual vetex implementation
*/
- public static class SimpleVertex extends
- Vertex<LongWritable, IntWritable, FloatWritable,
- DoubleWritable> {
+ public static class SimpleComputation extends BasicComputation<LongWritable,
+ IntWritable, FloatWritable, DoubleWritable> {
@Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
long superstep = getSuperstep();
if (superstep < TESTLENGTH) {
- EmitterWorkerContext emitter =
- (EmitterWorkerContext) getWorkerContext();
- emitter.emit("vertexId=" + getId() +
+ EmitterWorkerContext emitter = getWorkerContext();
+ emitter.emit("vertexId=" + vertex.getId() +
" superstep=" + superstep + "\n");
} else {
- voteToHalt();
+ vertex.voteToHalt();
}
}
}
@@ -169,7 +169,7 @@ public class SimpleVertexWithWorkerContext implements Tool {
"run: Must have 2 arguments <output path> <# of workers>");
}
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.getConfiguration().setVertexClass(SimpleVertex.class);
+ job.getConfiguration().setComputationClass(SimpleComputation.class);
job.getConfiguration().setVertexInputFormatClass(
SimpleSuperstepVertexInputFormat.class);
job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java
new file mode 100644
index 0000000..ad72951
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/TestComputationStateComputation.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Vertex to test the local variables in Computation, and pre/postSuperstep
+ * methods
+ */
+public class TestComputationStateComputation extends BasicComputation<
+ LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
+ /** How many compute threads to use in the test */
+ public static final int NUM_COMPUTE_THREADS = 10;
+ /** How many vertices to create for the test */
+ public static final int NUM_VERTICES = 100;
+ /** How many partitions to have */
+ public static final int NUM_PARTITIONS = 25;
+
+ /**
+ * The counter should hold the number of vertices in this partition,
+ * plus the current superstep
+ */
+ private long counter;
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ counter++;
+ if (getSuperstep() > 5) {
+ vertex.voteToHalt();
+ }
+ }
+
+ @Override
+ public void preSuperstep() {
+ counter =
+ ((TestComputationStateWorkerContext) getWorkerContext()).superstepCounter;
+ }
+
+ @Override
+ public void postSuperstep() {
+ ((TestComputationStateWorkerContext) getWorkerContext()).totalCounter
+ .addAndGet(counter);
+ }
+
+ /**
+ * WorkerContext for TestComputationState
+ */
+ public static class TestComputationStateWorkerContext extends
+ DefaultWorkerContext {
+ /** Current superstep */
+ private long superstepCounter;
+ /**
+ * This counter should hold the sum of Computation's counters
+ */
+ private AtomicLong totalCounter;
+
+ @Override
+ public void preSuperstep() {
+ superstepCounter = getSuperstep();
+ totalCounter = new AtomicLong(0);
+ }
+
+ @Override
+ public void postSuperstep() {
+ assertEquals(totalCounter.get(),
+ NUM_PARTITIONS * superstepCounter + getTotalNumVertices());
+ }
+ }
+
+ /**
+ * Throws exception if values are not equal.
+ *
+ * @param expected Expected value
+ * @param actual Actual value
+ */
+ private static void assertEquals(long expected, long actual) {
+ if (expected != actual) {
+ throw new RuntimeException("expected: " + expected +
+ ", actual: " + actual);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
index 6e3c589..bcb02cf 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VerifyMessage.java
@@ -19,6 +19,7 @@
package org.apache.giraph.examples;
import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.master.DefaultMasterCompute;
@@ -41,7 +42,7 @@ import java.io.IOException;
*/
public class VerifyMessage {
/**
- * Message that will be sent in {@link VerifyMessageVertex}.
+ * Message that will be sent in {@link VerifyMessageComputation}.
*/
public static class VerifiableMessage implements Writable {
/** Superstep sent on */
@@ -93,9 +94,9 @@ public class VerifyMessage {
/**
* Send and verify messages.
*/
- public static class VerifyMessageVertex extends
- Vertex<LongWritable, IntWritable, FloatWritable,
- VerifiableMessage> {
+ public static class VerifyMessageComputation extends
+ BasicComputation<LongWritable, IntWritable, FloatWritable,
+ VerifiableMessage> {
/** Dynamically set number of SUPERSTEPS */
public static final String SUPERSTEP_COUNT =
"verifyMessageVertex.superstepCount";
@@ -104,14 +105,15 @@ public class VerifyMessage {
/** Number of SUPERSTEPS to run (6 by default) */
private static int SUPERSTEPS = 6;
/** Class logger */
- private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
+ private static Logger LOG =
+ Logger.getLogger(VerifyMessageComputation.class);
public static long getFinalSum() {
return FINAL_SUM;
}
/**
- * Worker context used with {@link VerifyMessageVertex}.
+ * Worker context used with {@link VerifyMessageComputation}.
*/
public static class VerifyMessageVertexWorkerContext extends
WorkerContext {
@@ -138,28 +140,30 @@ public class VerifyMessage {
}
@Override
- public void compute(Iterable<VerifiableMessage> messages) {
+ public void compute(
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex,
+ Iterable<VerifiableMessage> messages) throws IOException {
String sumAggregatorName = LongSumAggregator.class.getName();
if (getSuperstep() > SUPERSTEPS) {
- voteToHalt();
+ vertex.voteToHalt();
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
}
- aggregate(sumAggregatorName, new LongWritable(getId().get()));
+ aggregate(sumAggregatorName, new LongWritable(vertex.getId().get()));
if (LOG.isDebugEnabled()) {
LOG.debug("compute: sum = " +
this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
- " for vertex " + getId());
+ " for vertex " + vertex.getId());
}
float msgValue = 0.0f;
for (VerifiableMessage message : messages) {
msgValue += message.value;
if (LOG.isDebugEnabled()) {
LOG.debug("compute: got msg = " + message +
- " for vertex id " + getId() +
- ", vertex value " + getValue() +
+ " for vertex id " + vertex.getId() +
+ ", vertex value " + vertex.getValue() +
" on superstep " + getSuperstep());
}
if (message.superstep != getSuperstep() - 1) {
@@ -168,44 +172,44 @@ public class VerifyMessage {
"the previous superstep, current superstep = " +
getSuperstep());
}
- if ((message.sourceVertexId != getId().get() - 1) &&
- (getId().get() != 0)) {
+ if ((message.sourceVertexId != vertex.getId().get() - 1) &&
+ (vertex.getId().get() != 0)) {
throw new IllegalStateException(
"compute: Impossible that this message didn't come " +
"from the previous vertex and came from " +
message.sourceVertexId);
}
}
- int vertexValue = getValue().get();
- setValue(new IntWritable(vertexValue + (int) msgValue));
+ int vertexValue = vertex.getValue().get();
+ vertex.setValue(new IntWritable(vertexValue + (int) msgValue));
if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getId() +
- " has value " + getValue() +
+ LOG.debug("compute: vertex " + vertex.getId() +
+ " has value " + vertex.getValue() +
" on superstep " + getSuperstep());
}
- for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
+ for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
FloatWritable newEdgeValue = new FloatWritable(
edge.getValue().get() + (float) vertexValue);
Edge<LongWritable, FloatWritable> newEdge =
EdgeFactory.create(edge.getTargetVertexId(), newEdgeValue);
if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getId() +
+ LOG.debug("compute: vertex " + vertex.getId() +
" sending edgeValue " + edge.getValue() +
" vertexValue " + vertexValue +
" total " + newEdgeValue +
" to vertex " + edge.getTargetVertexId() +
" on superstep " + getSuperstep());
}
- addEdge(newEdge);
+ vertex.addEdge(newEdge);
sendMessage(edge.getTargetVertexId(),
new VerifiableMessage(
- getSuperstep(), getId().get(), newEdgeValue.get()));
+ getSuperstep(), vertex.getId().get(), newEdgeValue.get()));
}
}
}
/**
- * Master compute associated with {@link VerifyMessageVertex}.
+ * Master compute associated with {@link VerifyMessageComputation}.
* It registers required aggregators.
*/
public static class VerifyMessageMasterCompute extends
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
index d328153..3d06561 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueDoubleEdgeTextOutputFormat.java
@@ -44,7 +44,7 @@ public class VertexWithDoubleValueDoubleEdgeTextOutputFormat extends
public class VertexWithDoubleValueWriter extends TextVertexWriter {
@Override
public void writeVertex(
- Vertex<LongWritable, DoubleWritable, DoubleWritable, ?> vertex)
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex)
throws IOException, InterruptedException {
StringBuilder output = new StringBuilder();
output.append(vertex.getId().get());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
index 85f3556..8cc769f 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueNullEdgeTextOutputFormat.java
@@ -47,7 +47,7 @@ public class VertexWithDoubleValueNullEdgeTextOutputFormat extends
public class VertexWithDoubleValueWriter extends TextVertexWriter {
@Override
public void writeVertex(
- Vertex<LongWritable, DoubleWritable, NullWritable, ?> vertex)
+ Vertex<LongWritable, DoubleWritable, NullWritable> vertex)
throws IOException, InterruptedException {
StringBuilder output = new StringBuilder();
output.append(vertex.getId().get());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
index 652913b..99ba770 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
@@ -20,9 +20,9 @@ package org.apache.giraph;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCheckpoint;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
@@ -58,15 +58,15 @@ public class TestAutoCheckpoint extends BspCase {
}
Path outputPath = getTempPath(getCallingMethodName());
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
+ conf.setBoolean(SimpleCheckpoint.ENABLE_FAULT, true);
conf.setInt("mapred.map.max.attempts", 4);
// Trigger failure faster
conf.setInt("mapred.task.timeout", 10000);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 38ba27f..28edbba 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -24,21 +24,22 @@ import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.examples.SimpleCombinerVertex;
-import org.apache.giraph.examples.SimpleFailVertex;
-import org.apache.giraph.examples.SimpleMasterComputeVertex;
-import org.apache.giraph.examples.SimpleMsgVertex;
-import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
-import org.apache.giraph.examples.SimpleShortestPathsVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCombinerComputation;
+import org.apache.giraph.examples.SimpleFailComputation;
+import org.apache.giraph.examples.SimpleMasterComputeComputation;
+import org.apache.giraph.examples.SimpleMsgComputation;
+import org.apache.giraph.examples.SimplePageRankComputation;
+import org.apache.giraph.examples.SimplePageRankComputation.SimplePageRankVertexInputFormat;
+import org.apache.giraph.examples.SimpleShortestPathsComputation;
+import org.apache.giraph.examples.SimpleSuperstepComputation;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.job.HadoopUtils;
+import org.apache.giraph.utils.NoOpComputation;
import org.apache.giraph.worker.InputSplitPathOrganizer;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.conf.Configuration;
@@ -111,13 +112,12 @@ public class
System.out.println("testInstantiateVertex: java.class.path=" +
System.getProperty("java.class.path"));
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleSuperstepVertex.class);
- conf.setVertexInputFormatClass(
- SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
+ conf.setComputationClass(SimpleSuperstepComputation.class);
+ conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
ImmutableClassesGiraphConfiguration configuration =
new ImmutableClassesGiraphConfiguration(job.getConfiguration());
- Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
+ Vertex<LongWritable, IntWritable, FloatWritable> vertex =
configuration.createVertex();
vertex.initialize(new LongWritable(1), new IntWritable(1));
System.out.println("testInstantiateVertex: Got vertex " + vertex);
@@ -133,11 +133,8 @@ public class
byteArrayOutputStream.toString());
}
- private static class NullVertex extends Vertex<NullWritable, NullWritable,
- NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException { }
- }
+ private static class NullComputation extends NoOpComputation<NullWritable,
+ NullWritable, NullWritable, NullWritable> { }
/**
* Test whether vertices with NullWritable for vertex value type, edge value
@@ -146,19 +143,17 @@ public class
@Test
public void testInstantiateNullVertex() throws IOException {
GiraphConfiguration nullConf = new GiraphConfiguration();
- nullConf.setVertexClass(NullVertex.class);
- ImmutableClassesGiraphConfiguration<
- NullWritable, NullWritable, NullWritable,
+ nullConf.setComputationClass(NullComputation.class);
+ ImmutableClassesGiraphConfiguration<NullWritable, NullWritable,
NullWritable> immutableClassesGiraphConfiguration =
new ImmutableClassesGiraphConfiguration<
- NullWritable, NullWritable, NullWritable, NullWritable>(
- nullConf);
+ NullWritable, NullWritable, NullWritable>(nullConf);
NullWritable vertexValue =
immutableClassesGiraphConfiguration.createVertexValue();
NullWritable edgeValue =
immutableClassesGiraphConfiguration.createEdgeValue();
NullWritable messageValue =
- immutableClassesGiraphConfiguration.createMessageValue();
+ immutableClassesGiraphConfiguration.createOutgoingMessageValue();
assertSame(vertexValue.getClass(), NullWritable.class);
assertSame(vertexValue, edgeValue);
assertSame(edgeValue, messageValue);
@@ -180,7 +175,7 @@ public class
return;
}
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleSuperstepVertex.class);
+ conf.setComputationClass(SimpleSuperstepComputation.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
conf = job.getConfiguration();
@@ -222,7 +217,7 @@ public class
}
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleFailVertex.class);
+ conf.setComputationClass(SimpleFailComputation.class);
conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf,
getTempPath(getCallingMethodName()));
@@ -243,7 +238,7 @@ public class
String callingMethod = getCallingMethodName();
Path outputPath = getTempPath(callingMethod);
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleSuperstepVertex.class);
+ conf.setComputationClass(SimpleSuperstepComputation.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob job = prepareJob(callingMethod, conf, outputPath);
@@ -268,7 +263,7 @@ public class
public void testBspMsg()
throws IOException, InterruptedException, ClassNotFoundException {
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleMsgVertex.class);
+ conf.setComputationClass(SimpleMsgComputation.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
assertTrue(job.run(true));
@@ -287,7 +282,7 @@ public class
public void testEmptyVertexInputFormat()
throws IOException, InterruptedException, ClassNotFoundException {
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleMsgVertex.class);
+ conf.setComputationClass(SimpleMsgComputation.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
@@ -305,7 +300,7 @@ public class
public void testBspCombiner()
throws IOException, InterruptedException, ClassNotFoundException {
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleCombinerVertex.class);
+ conf.setComputationClass(SimpleCombinerComputation.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setCombinerClass(SimpleSumCombiner.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
@@ -366,11 +361,11 @@ public class
throws IOException, InterruptedException, ClassNotFoundException {
Path outputPath = getTempPath(getCallingMethodName());
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleShortestPathsVertex.class);
+ conf.setComputationClass(SimpleShortestPathsComputation.class);
conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
conf.setVertexOutputFormatClass(
JsonLongDoubleFloatDoubleVertexOutputFormat.class);
- SimpleShortestPathsVertex.SOURCE_ID.set(conf, 0);
+ SimpleShortestPathsComputation.SOURCE_ID.set(conf, 0);
GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
assertTrue(job.run(true));
@@ -394,15 +389,15 @@ public class
Path outputPath = getTempPath(getCallingMethodName());
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimplePageRankVertex.class);
+ conf.setComputationClass(SimplePageRankComputation.class);
conf.setAggregatorWriterClass(TextAggregatorWriter.class);
conf.setMasterComputeClass(
- SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
+ SimplePageRankComputation.SimplePageRankMasterCompute.class);
conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
conf.setVertexOutputFormatClass(
- SimplePageRankVertex.SimplePageRankVertexOutputFormat.class);
+ SimplePageRankComputation.SimplePageRankVertexOutputFormat.class);
conf.setWorkerContextClass(
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
+ SimplePageRankComputation.SimplePageRankWorkerContext.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
GiraphConfiguration configuration = job.getConfiguration();
Path aggregatorValues = getTempPath("aggregatorValues");
@@ -419,11 +414,11 @@ public class
try {
if (!runningInDistributedMode()) {
double maxPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
+ SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMax();
double minPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
+ SimplePageRankComputation.SimplePageRankWorkerContext.getFinalMin();
long numVertices =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
+ SimplePageRankComputation.SimplePageRankWorkerContext.getFinalSum();
System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
" minPageRank=" + minPageRank + " numVertices=" + numVertices);
@@ -455,7 +450,7 @@ public class
}
}
- int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS;
+ int maxSuperstep = SimplePageRankComputation.MAX_SUPERSTEPS;
assertEquals(maxSuperstep + 2, minValues.size());
assertEquals(maxSuperstep + 2, maxValues.size());
assertEquals(maxSuperstep + 2, vertexCounts.size());
@@ -485,17 +480,17 @@ public class
public void testBspMasterCompute()
throws IOException, InterruptedException, ClassNotFoundException {
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(SimpleMasterComputeVertex.class);
+ conf.setComputationClass(SimpleMasterComputeComputation.class);
conf.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
conf.setMasterComputeClass(
- SimpleMasterComputeVertex.SimpleMasterCompute.class);
+ SimpleMasterComputeComputation.SimpleMasterCompute.class);
conf.setWorkerContextClass(
- SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.class);
+ SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
double finalSum =
- SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.getFinalSum();
+ SimpleMasterComputeComputation.SimpleMasterComputeWorkerContext.getFinalSum();
System.out.println("testBspMasterCompute: finalSum=" + finalSum);
assertEquals(32.5, finalSum, 0d);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
new file mode 100644
index 0000000..1092eac
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimplePageRankComputation;
+import org.apache.giraph.examples.TestComputationStateComputation;
+import org.apache.giraph.job.GiraphJob;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestComputationState extends BspCase {
+ public TestComputationState() {
+ super(TestComputationState.class.getName());
+ }
+
+ @Test
+ public void testComputationState() throws IOException,
+ ClassNotFoundException, InterruptedException {
+ if (runningInDistributedMode()) {
+ System.out.println(
+ "testComputeContext: Ignore this test in distributed mode.");
+ return;
+ }
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setComputationClass(TestComputationStateComputation.class);
+ conf.setVertexInputFormatClass(
+ SimplePageRankComputation.SimplePageRankVertexInputFormat.class);
+ conf.setWorkerContextClass(
+ TestComputationStateComputation.TestComputationStateWorkerContext.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
+ // Use multithreading
+ job.getConfiguration().setNumComputeThreads(
+ TestComputationStateComputation.NUM_COMPUTE_THREADS);
+ // Increase the number of vertices
+ job.getConfiguration().setInt(
+ GeneratedVertexReader.READER_VERTICES,
+ TestComputationStateComputation.NUM_VERTICES);
+ // Increase the number of partitions
+ GiraphConstants.USER_PARTITION_COUNT.set(job.getConfiguration(),
+ TestComputationStateComputation.NUM_PARTITIONS);
+ assertTrue(job.run(true));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 12f0d8d..4537eac 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -21,9 +21,9 @@ package org.apache.giraph;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCheckpoint;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.partition.HashRangePartitionerFactory;
@@ -76,12 +76,12 @@ public class TestGraphPartitioner extends BspCase {
throws IOException, InterruptedException, ClassNotFoundException {
Path outputPath = getTempPath("testVertexBalancer");
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob job = prepareJob("testVertexBalancer", conf, outputPath);
@@ -94,12 +94,12 @@ public class TestGraphPartitioner extends BspCase {
FileSystem hdfs = FileSystem.get(job.getConfiguration());
conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
outputPath = getTempPath("testHashPartitioner");
@@ -109,12 +109,12 @@ public class TestGraphPartitioner extends BspCase {
outputPath = getTempPath("testSuperstepHashPartitioner");
conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
job = prepareJob("testSuperstepHashPartitioner", conf, outputPath);
@@ -127,12 +127,12 @@ public class TestGraphPartitioner extends BspCase {
job = new GiraphJob("testHashRangePartitioner");
setupConfiguration(job);
- job.getConfiguration().setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ job.getConfiguration().setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
job.getConfiguration().setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
job.getConfiguration().setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
job.getConfiguration().setVertexInputFormatClass(
SimpleSuperstepVertexInputFormat.class);
job.getConfiguration().setVertexOutputFormatClass(
@@ -146,12 +146,12 @@ public class TestGraphPartitioner extends BspCase {
outputPath = getTempPath("testReverseIdSuperstepHashPartitioner");
conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
job = prepareJob("testReverseIdSuperstepHashPartitioner", conf,
@@ -165,12 +165,12 @@ public class TestGraphPartitioner extends BspCase {
job = new GiraphJob("testSimpleRangePartitioner");
setupConfiguration(job);
- job.getConfiguration().setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ job.getConfiguration().setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
job.getConfiguration().setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
job.getConfiguration().setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
job.getConfiguration().setVertexInputFormatClass(
SimpleSuperstepVertexInputFormat.class);
job.getConfiguration().setVertexOutputFormatClass(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
index 766e1af..fc94480 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestManualCheckpoint.java
@@ -20,9 +20,9 @@ package org.apache.giraph;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
+import org.apache.giraph.examples.SimpleCheckpoint;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
+import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -54,12 +54,12 @@ public class TestManualCheckpoint extends BspCase {
Path checkpointsDir = getTempPath("checkPointsForTesting");
Path outputPath = getTempPath(getCallingMethodName());
GiraphConfiguration conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf, outputPath);
@@ -75,7 +75,7 @@ public class TestManualCheckpoint extends BspCase {
if (!runningInDistributedMode()) {
FileStatus fileStatus = getSinglePartFileStatus(job.getConfiguration(),
outputPath);
- idSum = SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext
+ idSum = SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
.getFinalSum();
System.out.println("testBspCheckpoint: idSum = " + idSum +
" fileLen = " + fileStatus.getLen());
@@ -86,25 +86,25 @@ public class TestManualCheckpoint extends BspCase {
" with checkpoint path = " + checkpointsDir);
outputPath = getTempPath(getCallingMethodName() + "Restarted");
conf = new GiraphConfiguration();
- conf.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
+ conf.setComputationClass(
+ SimpleCheckpoint.SimpleCheckpointComputation.class);
conf.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
conf.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
GiraphJob restartedJob = prepareJob(getCallingMethodName() + "Restarted",
conf, outputPath);
configuration.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
+ SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
GiraphConstants.CHECKPOINT_DIRECTORY.set(restartedJob.getConfiguration(),
checkpointsDir.toString());
assertTrue(restartedJob.run(true));
if (!runningInDistributedMode()) {
long idSumRestarted =
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext
+ SimpleCheckpoint.SimpleCheckpointVertexWorkerContext
.getFinalSum();
System.out.println("testBspCheckpoint: idSumRestarted = " +
idSumRestarted);