You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/02/10 03:13:21 UTC
[5/6] GIRAPH-470 (tavoaqp via nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
deleted file mode 100644
index d3bd33d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.DefaultEdge;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-
-/**
- * Vertex to allow unit testing of graph mutations.
- */
-public class SimpleMutateGraphVertex extends EdgeListVertex<
- LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
- /** Class logger */
- private static Logger LOG =
- Logger.getLogger(SimpleMutateGraphVertex.class);
- /** Maximum number of ranges for vertex ids */
- private long maxRanges = 100;
-
-
- /**
- * Unless we create a ridiculous number of vertices , we should not
- * collide within a vertex range defined by this method.
- *
- * @param range Range index
- * @return Starting vertex id of the range
- */
- private long rangeVertexIdStart(int range) {
- return (Long.MAX_VALUE / maxRanges) * range;
- }
-
- @Override
- public void compute(Iterable<DoubleWritable> messages)
- throws IOException {
- SimpleMutateGraphVertexWorkerContext workerContext =
- (SimpleMutateGraphVertexWorkerContext) getWorkerContext();
- if (getSuperstep() == 0) {
- LOG.debug("Reached superstep " + getSuperstep());
- } else if (getSuperstep() == 1) {
- // Send messages to vertices that are sure not to exist
- // (creating them)
- LongWritable destVertexId =
- new LongWritable(rangeVertexIdStart(1) + getId().get());
- sendMessage(destVertexId, new DoubleWritable(0.0));
- } else if (getSuperstep() == 2) {
- LOG.debug("Reached superstep " + getSuperstep());
- } else if (getSuperstep() == 3) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getTotalNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumVertices() +
- " vertices when should have " + vertexCount * 2 +
- " on superstep " + getSuperstep());
- }
- long edgeCount = workerContext.getEdgeCount();
- if (edgeCount != getTotalNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumEdges() +
- " edges when should have " + edgeCount +
- " on superstep " + getSuperstep());
- }
- // Create vertices that are sure not to exist (doubling vertices)
- LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getId().get());
- addVertexRequest(vertexIndex, new DoubleWritable(0.0));
- // Add edges to those remote vertices as well
- addEdgeRequest(vertexIndex,
- new DefaultEdge<LongWritable, FloatWritable>(
- getId(), new FloatWritable(0.0f)));
- } else if (getSuperstep() == 4) {
- LOG.debug("Reached superstep " + getSuperstep());
- } else if (getSuperstep() == 5) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount * 2 != getTotalNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumVertices() +
- " when should have " + vertexCount * 2 +
- " on superstep " + getSuperstep());
- }
- long edgeCount = workerContext.getEdgeCount();
- if (edgeCount + vertexCount != getTotalNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumEdges() +
- " edges when should have " + edgeCount + vertexCount +
- " on superstep " + getSuperstep());
- }
- // Remove the edges created in superstep 3
- LongWritable vertexIndex =
- new LongWritable(rangeVertexIdStart(3) + getId().get());
- workerContext.increaseEdgesRemoved();
- removeEdgesRequest(vertexIndex, getId());
- } else if (getSuperstep() == 6) {
- // Remove all the vertices created in superstep 3
- if (getId().compareTo(
- new LongWritable(rangeVertexIdStart(3))) >= 0) {
- removeVertexRequest(getId());
- }
- } else if (getSuperstep() == 7) {
- long origEdgeCount = workerContext.getOrigEdgeCount();
- if (origEdgeCount != getTotalNumEdges()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumEdges() +
- " edges when should have " + origEdgeCount +
- " on superstep " + getSuperstep());
- }
- } else if (getSuperstep() == 8) {
- long vertexCount = workerContext.getVertexCount();
- if (vertexCount / 2 != getTotalNumVertices()) {
- throw new IllegalStateException(
- "Impossible to have " + getTotalNumVertices() +
- " vertices when should have " + vertexCount / 2 +
- " on superstep " + getSuperstep());
- }
- } else {
- voteToHalt();
- }
- }
-
- /**
- * Worker context used with {@link SimpleMutateGraphVertex}.
- */
- public static class SimpleMutateGraphVertexWorkerContext
- extends WorkerContext {
- /** Cached vertex count */
- private long vertexCount;
- /** Cached edge count */
- private long edgeCount;
- /** Original number of edges */
- private long origEdgeCount;
- /** Number of edges removed during superstep */
- private int edgesRemoved = 0;
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException { }
-
- @Override
- public void postApplication() { }
-
- @Override
- public void preSuperstep() { }
-
- @Override
- public void postSuperstep() {
- vertexCount = getTotalNumVertices();
- edgeCount = getTotalNumEdges();
- if (getSuperstep() == 1) {
- origEdgeCount = edgeCount;
- }
- LOG.info("Got " + vertexCount + " vertices, " +
- edgeCount + " edges on superstep " +
- getSuperstep());
- LOG.info("Removed " + edgesRemoved);
- edgesRemoved = 0;
- }
-
- public long getVertexCount() {
- return vertexCount;
- }
-
- public long getEdgeCount() {
- return edgeCount;
- }
-
- public long getOrigEdgeCount() {
- return origEdgeCount;
- }
-
- /**
- * Increase the number of edges removed by one.
- */
- public void increaseEdgesRemoved() {
- this.edgesRemoved++;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
deleted file mode 100644
index 1e010a1..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleOutDegreeCountVertex.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.giraph.vertex.EdgeListVertex;
-
-
-/**
- * Simple function to return the out degree for each vertex.
- */
-@Algorithm(
- name = "Outdegree Count"
-)
-public class SimpleOutDegreeCountVertex extends EdgeListVertex<
- LongWritable, LongWritable,
- DoubleWritable, DoubleWritable> {
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- LongWritable vertexValue = getValue();
- vertexValue.set(getNumEdges());
- setValue(vertexValue);
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
deleted file mode 100644
index ba0242d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.aggregators.DoubleMaxAggregator;
-import org.apache.giraph.aggregators.DoubleMinAggregator;
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.DefaultEdge;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.vertex.LongDoubleFloatDoubleVertex;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Demonstrates the basic Pregel PageRank implementation.
- */
-@Algorithm(
- name = "Page rank"
-)
-public class SimplePageRankVertex extends LongDoubleFloatDoubleVertex {
- /** Number of supersteps for this test */
- public static final int MAX_SUPERSTEPS = 30;
- /** Logger */
- private static final Logger LOG =
- Logger.getLogger(SimplePageRankVertex.class);
- /** Sum aggregator name */
- private static String SUM_AGG = "sum";
- /** Min aggregator name */
- private static String MIN_AGG = "min";
- /** Max aggregator name */
- private static String MAX_AGG = "max";
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- if (getSuperstep() >= 1) {
- double sum = 0;
- for (DoubleWritable message : messages) {
- sum += message.get();
- }
- DoubleWritable vertexValue =
- new DoubleWritable((0.15f / getTotalNumVertices()) + 0.85f * sum);
- setValue(vertexValue);
- aggregate(MAX_AGG, vertexValue);
- aggregate(MIN_AGG, vertexValue);
- aggregate(SUM_AGG, new LongWritable(1));
- LOG.info(getId() + ": PageRank=" + vertexValue +
- " max=" + getAggregatedValue(MAX_AGG) +
- " min=" + getAggregatedValue(MIN_AGG));
- }
-
- if (getSuperstep() < MAX_SUPERSTEPS) {
- long edges = getNumEdges();
- sendMessageToAllEdges(
- new DoubleWritable(getValue().get() / edges));
- } else {
- voteToHalt();
- }
- }
-
- /**
- * Worker context used with {@link SimplePageRankVertex}.
- */
- public static class SimplePageRankVertexWorkerContext extends
- WorkerContext {
- /** Final max value for verification for local jobs */
- private static double FINAL_MAX;
- /** Final min value for verification for local jobs */
- private static double FINAL_MIN;
- /** Final sum value for verification for local jobs */
- private static long FINAL_SUM;
-
- public static double getFinalMax() {
- return FINAL_MAX;
- }
-
- public static double getFinalMin() {
- return FINAL_MIN;
- }
-
- public static long getFinalSum() {
- return FINAL_SUM;
- }
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- }
-
- @Override
- public void postApplication() {
- FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
- FINAL_MAX = this.<DoubleWritable>getAggregatedValue(MAX_AGG).get();
- FINAL_MIN = this.<DoubleWritable>getAggregatedValue(MIN_AGG).get();
-
- LOG.info("aggregatedNumVertices=" + FINAL_SUM);
- LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
- LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
- }
-
- @Override
- public void preSuperstep() {
- if (getSuperstep() >= 3) {
- LOG.info("aggregatedNumVertices=" +
- getAggregatedValue(SUM_AGG) +
- " NumVertices=" + getTotalNumVertices());
- if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
- getTotalNumVertices()) {
- throw new RuntimeException("wrong value of SumAggreg: " +
- getAggregatedValue(SUM_AGG) + ", should be: " +
- getTotalNumVertices());
- }
- DoubleWritable maxPagerank = getAggregatedValue(MAX_AGG);
- LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
- DoubleWritable minPagerank = getAggregatedValue(MIN_AGG);
- LOG.info("aggregatedMinPageRank=" + minPagerank.get());
- }
- }
-
- @Override
- public void postSuperstep() { }
- }
-
- /**
- * Master compute associated with {@link SimplePageRankVertex}.
- * It registers required aggregators.
- */
- public static class SimplePageRankVertexMasterCompute extends
- DefaultMasterCompute {
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(SUM_AGG, LongSumAggregator.class);
- registerPersistentAggregator(MIN_AGG, DoubleMinAggregator.class);
- registerPersistentAggregator(MAX_AGG, DoubleMaxAggregator.class);
- }
- }
-
- /**
- * Simple VertexReader that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexReader extends
- GeneratedVertexReader<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimplePageRankVertexReader.class);
-
- @Override
- public boolean nextVertex() {
- return totalRecords > recordsRead;
- }
-
- @Override
- public Vertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> getCurrentVertex() throws IOException {
- Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- vertex = configuration.createVertex();
-
- LongWritable vertexId = new LongWritable(
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
- DoubleWritable vertexValue = new DoubleWritable(vertexId.get() * 10d);
- long targetVertexId =
- (vertexId.get() + 1) %
- (inputSplit.getNumSplits() * totalRecords);
- float edgeValue = vertexId.get() * 100f;
- List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
- edges.add(new DefaultEdge<LongWritable, FloatWritable>(
- new LongWritable(targetVertexId),
- new FloatWritable(edgeValue)));
- vertex.initialize(vertexId, vertexValue, edges);
- ++recordsRead;
- if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getId().get() +
- ", vertexValue=" + vertex.getValue() +
- ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
- }
- return vertex;
- }
- }
-
- /**
- * Simple VertexInputFormat that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexInputFormat extends
- GeneratedVertexInputFormat<LongWritable,
- DoubleWritable, FloatWritable, DoubleWritable> {
- @Override
- public VertexReader<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> createVertexReader(InputSplit split,
- TaskAttemptContext context)
- throws IOException {
- return new SimplePageRankVertexReader();
- }
- }
-
- /**
- * Simple VertexOutputFormat that supports {@link SimplePageRankVertex}
- */
- public static class SimplePageRankVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new SimplePageRankVertexWriter();
- }
-
- /**
- * Simple VertexWriter that supports {@link SimplePageRankVertex}
- */
- public class SimplePageRankVertexWriter extends TextVertexWriter {
- @Override
- public void writeVertex(
- Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
deleted file mode 100644
index 1bec7eb..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleShortestPathsVertex.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.log4j.Logger;
-
-/**
- * Demonstrates the basic Pregel shortest paths implementation.
- */
-@Algorithm(
- name = "Shortest paths",
- description = "Finds all shortest paths from a selected vertex"
-)
-public class SimpleShortestPathsVertex extends
- EdgeListVertex<LongWritable, DoubleWritable,
- FloatWritable, DoubleWritable> {
- /** The shortest paths id */
- public static final String SOURCE_ID = "SimpleShortestPathsVertex.sourceId";
- /** Default shortest paths id */
- public static final long SOURCE_ID_DEFAULT = 1;
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleShortestPathsVertex.class);
-
- /**
- * Is this vertex the source id?
- *
- * @return True if the source id
- */
- private boolean isSource() {
- return getId().get() ==
- getContext().getConfiguration().getLong(SOURCE_ID,
- SOURCE_ID_DEFAULT);
- }
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) {
- if (getSuperstep() == 0) {
- setValue(new DoubleWritable(Double.MAX_VALUE));
- }
- double minDist = isSource() ? 0d : Double.MAX_VALUE;
- for (DoubleWritable message : messages) {
- minDist = Math.min(minDist, message.get());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getId() + " got minDist = " + minDist +
- " vertex value = " + getValue());
- }
- if (minDist < getValue().get()) {
- setValue(new DoubleWritable(minDist));
- for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
- double distance = minDist + edge.getValue().get();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex " + getId() + " sent to " +
- edge.getTargetVertexId() + " = " + distance);
- }
- sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
- }
- }
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
deleted file mode 100644
index 09efe88..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.DefaultEdge;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.io.VertexReader;
-import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Just a simple Vertex compute implementation that executes 3 supersteps, then
- * finishes.
- */
-public class SimpleSuperstepVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
- @Override
- public void compute(Iterable<IntWritable> messages) {
- // Some checks for additional testing
- if (getTotalNumVertices() < 1) {
- throw new IllegalStateException("compute: Illegal total vertices " +
- getTotalNumVertices());
- }
- if (getTotalNumEdges() < 0) {
- throw new IllegalStateException("compute: Illegal total edges " +
- getTotalNumEdges());
- }
- if (isHalted()) {
- throw new IllegalStateException("compute: Impossible to be halted - " +
- isHalted());
- }
-
- if (getSuperstep() > 3) {
- voteToHalt();
- }
- }
-
- /**
- * Simple VertexReader that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexReader extends
- GeneratedVertexReader<LongWritable, IntWritable,
- FloatWritable, IntWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SimpleSuperstepVertexReader.class);
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return totalRecords > recordsRead;
- }
-
- @Override
- public Vertex<LongWritable, IntWritable, FloatWritable,
- IntWritable> getCurrentVertex()
- throws IOException, InterruptedException {
- Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
- configuration.createVertex();
- long tmpId = reverseIdOrder ?
- ((inputSplit.getSplitIndex() + 1) * totalRecords) -
- recordsRead - 1 :
- (inputSplit.getSplitIndex() * totalRecords) + recordsRead;
- LongWritable vertexId = new LongWritable(tmpId);
- IntWritable vertexValue =
- new IntWritable((int) (vertexId.get() * 10));
- List<Edge<LongWritable, FloatWritable>> edges = Lists.newLinkedList();
- long targetVertexId =
- (vertexId.get() + 1) %
- (inputSplit.getNumSplits() * totalRecords);
- float edgeValue = vertexId.get() * 100f;
- edges.add(new DefaultEdge<LongWritable, FloatWritable>(
- new LongWritable(targetVertexId),
- new FloatWritable(edgeValue)));
- vertex.initialize(vertexId, vertexValue, edges);
- ++recordsRead;
- if (LOG.isInfoEnabled()) {
- LOG.info("next: Return vertexId=" + vertex.getId().get() +
- ", vertexValue=" + vertex.getValue() +
- ", targetVertexId=" + targetVertexId +
- ", edgeValue=" + edgeValue);
- }
- return vertex;
- }
- }
-
- /**
- * Simple VertexInputFormat that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexInputFormat extends
- GeneratedVertexInputFormat<LongWritable,
- IntWritable, FloatWritable, IntWritable> {
- @Override
- public VertexReader<LongWritable, IntWritable, FloatWritable, IntWritable>
- createVertexReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- return new SimpleSuperstepVertexReader();
- }
- }
-
-
- /**
- * Simple VertexOutputFormat that supports {@link SimpleSuperstepVertex}
- */
- public static class SimpleSuperstepVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new SimpleSuperstepVertexWriter();
- }
-
- /**
- * Simple VertexWriter that supports {@link SimpleSuperstepVertex}
- */
- public class SimpleSuperstepVertexWriter extends TextVertexWriter {
- @Override
- public void writeVertex(Vertex<LongWritable, IntWritable,
- FloatWritable, ?> vertex) throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
deleted file mode 100644
index a57c6d2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTextVertexOutputFormat.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import java.io.IOException;
-
-/**
- * Simple text based vertex output format example.
- */
-public class SimpleTextVertexOutputFormat extends
- TextVertexOutputFormat<LongWritable, IntWritable, FloatWritable> {
- /**
- * Simple text based vertex writer
- */
- private class SimpleTextVertexWriter extends TextVertexWriter {
- @Override
- public void writeVertex(
- Vertex<LongWritable, IntWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- getRecordWriter().write(
- new Text(vertex.getId().toString()),
- new Text(vertex.getValue().toString()));
- }
- }
-
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new SimpleTextVertexWriter();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
deleted file mode 100644
index 469de85..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleTriangleClosingVertex.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.utils.ArrayListWritable;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Demonstrates triangle closing in simple,
- * unweighted graphs for Giraph.
- *
- * Triangle Closing: Vertex A and B maintain out-edges to C and D
- * The algorithm, when finished, populates all vertices' value with an
- * array of Writables representing all the vertices that each
- * should form an out-edge to (connect with, if this is a social
- * graph.)
- * In this example, vertices A and B would hold empty arrays
- * since they are already connected with C and D. Results:
- * If the graph is undirected, C would hold value, D and D would
- * hold value C, since both are neighbors of A and B and yet both
- * were not previously connected to each other.
- *
- * In a social graph, the result values for vertex X would represent people
- * that are likely a part of a person X's social circle (they know one or more
- * people X is connected to already) but X had not previously met them yet.
- * Given this new information, X can decide to connect to vertices (peoople) in
- * the result array or not.
- *
- * Results at each vertex are ordered in terms of the # of neighbors
- * who are connected to each vertex listed in the final vertex value.
- * The more of a vertex's neighbors who "know" someone, the stronger
- * your social relationship is presumed to be to that vertex (assuming
- * a social graph) and the more likely you should connect with them.
- *
- * In this implementation, Edge Values are not used, but could be
- * adapted to represent additional qualities that could affect the
- * ordering of the final result array.
- */
-public class SimpleTriangleClosingVertex extends EdgeListVertex<
- IntWritable, SimpleTriangleClosingVertex.IntArrayListWritable,
- NullWritable, IntWritable> {
- /** Vertices to close the triangle, ranked by frequency of in-msgs */
- private Map<IntWritable, Integer> closeMap =
- Maps.<IntWritable, Integer>newHashMap();
-
- @Override
- public void compute(Iterable<IntWritable> messages) {
- if (getSuperstep() == 0) {
- // send list of this vertex's neighbors to all neighbors
- for (Edge<IntWritable, NullWritable> edge : getEdges()) {
- sendMessageToAllEdges(edge.getTargetVertexId());
- }
- } else {
- for (IntWritable message : messages) {
- final int current = (closeMap.get(message) == null) ?
- 0 : closeMap.get(message) + 1;
- closeMap.put(message, current);
- }
- // make sure the result values are sorted and
- // packaged in an IntArrayListWritable for output
- Set<SimpleTriangleClosingVertex.Pair> sortedResults =
- Sets.<SimpleTriangleClosingVertex.Pair>newTreeSet();
- for (Map.Entry<IntWritable, Integer> entry : closeMap.entrySet()) {
- sortedResults.add(new Pair(entry.getKey(), entry.getValue()));
- }
- SimpleTriangleClosingVertex.IntArrayListWritable
- outputList = new SimpleTriangleClosingVertex.IntArrayListWritable();
- for (SimpleTriangleClosingVertex.Pair pair : sortedResults) {
- if (pair.value > 0) {
- outputList.add(pair.key);
- } else {
- break;
- }
- }
- setValue(outputList);
- }
- voteToHalt();
- }
-
- /** Quick, immutable K,V storage for sorting in tree set */
- public static class Pair implements Comparable<Pair> {
- /** key
- * @param key the IntWritable key */
- private final IntWritable key;
- /** value
- * @param value the Integer value */
- private final Integer value;
- /** Constructor
- * @param k the key
- * @param v the value
- */
- public Pair(IntWritable k, Integer v) {
- key = k;
- value = v;
- }
- /** key getter
- * @return the key */
- public IntWritable getKey() { return key; }
- /** value getter
- * @return the value */
- public Integer getValue() { return value; }
- /** Comparator to quickly sort by values
- * @param other the Pair to compare with THIS
- * @return the comparison value as an integer */
- @Override
- public int compareTo(Pair other) {
- return other.value - this.value;
- }
- }
-
- /** Utility class for delivering the array of vertices THIS vertex
- * should connect with to close triangles with neighbors */
- public static class IntArrayListWritable
- extends ArrayListWritable<IntWritable> {
- /** Default constructor for reflection */
- public IntArrayListWritable() {
- super();
- }
- /** Set storage type for this ArrayListWritable */
- @Override
- @SuppressWarnings("unchecked")
- public void setClass() {
- setClass(IntWritable.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
deleted file mode 100644
index f6488d5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/SimpleVertexWithWorkerContext.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.examples.SimpleSuperstepVertex.
- SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * Fully runnable example of how to
- * emit worker data to HDFS during a graph
- * computation.
- */
-public class SimpleVertexWithWorkerContext implements Tool {
- /** Directory name of where to write. */
- public static final String OUTPUTDIR = "svwwc.outputdir";
- /** Halting condition for the number of supersteps */
- private static final int TESTLENGTH = 30;
- /** Configuration */
- private Configuration conf;
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- /**
- * Actual vetex implementation
- */
- public static class SimpleVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable,
- DoubleWritable> {
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
-
- long superstep = getSuperstep();
-
- if (superstep < TESTLENGTH) {
- EmitterWorkerContext emitter =
- (EmitterWorkerContext) getWorkerContext();
- emitter.emit("vertexId=" + getId() +
- " superstep=" + superstep + "\n");
- } else {
- voteToHalt();
- }
- }
- }
-
- /**
- * Example worker context to emit data as part of a superstep.
- */
- @SuppressWarnings("rawtypes")
- public static class EmitterWorkerContext extends WorkerContext {
- /** File name prefix */
- private static final String FILENAME = "emitter_";
- /** Output stream to dump the strings. */
- private DataOutputStream out;
-
- @Override
- public void preApplication() {
- Context context = getContext();
- FileSystem fs;
-
- try {
- fs = FileSystem.get(context.getConfiguration());
-
- String p = context.getConfiguration()
- .get(SimpleVertexWithWorkerContext.OUTPUTDIR);
- if (p == null) {
- throw new IllegalArgumentException(
- SimpleVertexWithWorkerContext.OUTPUTDIR +
- " undefined!");
- }
-
- Path path = new Path(p);
- if (!fs.exists(path)) {
- throw new IllegalArgumentException(path +
- " doesn't exist");
- }
-
- Path outF = new Path(path, FILENAME +
- context.getTaskAttemptID());
- if (fs.exists(outF)) {
- throw new IllegalArgumentException(outF +
- " aready exists");
- }
-
- out = fs.create(outF);
- } catch (IOException e) {
- throw new RuntimeException(
- "can't initialize WorkerContext", e);
- }
- }
-
- @Override
- public void postApplication() {
- if (out != null) {
- try {
- out.flush();
- out.close();
- } catch (IOException e) {
- throw new RuntimeException(
- "can't finalize WorkerContext", e);
- }
- out = null;
- }
- }
-
- @Override
- public void preSuperstep() { }
-
- @Override
- public void postSuperstep() { }
-
- /**
- * Write this string to the output stream.
- *
- * @param s String to dump.
- */
- public void emit(String s) {
- try {
- out.writeUTF(s);
- } catch (IOException e) {
- throw new RuntimeException("can't emit", e);
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2) {
- throw new IllegalArgumentException(
- "run: Must have 2 arguments <output path> <# of workers>");
- }
- GiraphJob job = new GiraphJob(getConf(), getClass().getName());
- job.getConfiguration().setVertexClass(SimpleVertex.class);
- job.getConfiguration().setVertexInputFormatClass(
- SimpleSuperstepVertexInputFormat.class);
- job.getConfiguration().setWorkerContextClass(EmitterWorkerContext.class);
- job.getConfiguration().set(
- SimpleVertexWithWorkerContext.OUTPUTDIR, args[0]);
- job.getConfiguration().setWorkerConfiguration(Integer.parseInt(args[1]),
- Integer.parseInt(args[1]),
- 100.0f);
- if (job.run(true)) {
- return 0;
- } else {
- return -1;
- }
- }
-
- /**
- * Executable from the command line.
- *
- * @param args Command line arguments.
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new SimpleVertexWithWorkerContext(), args));
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/VerifyMessage.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/VerifyMessage.java b/giraph-core/src/main/java/org/apache/giraph/examples/VerifyMessage.java
deleted file mode 100644
index 507a56b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/VerifyMessage.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.graph.DefaultEdge;
-import org.apache.giraph.graph.Edge;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * An example that simply uses its id, value, and edges to compute new data
- * every iteration to verify that messages are sent and received at the
- * appropriate location and superstep.
- */
-public class VerifyMessage {
- /**
- * Message that will be sent in {@link VerifyMessageVertex}.
- */
- public static class VerifiableMessage implements Writable {
- /** Superstep sent on */
- private long superstep;
- /** Source vertex id */
- private long sourceVertexId;
- /** Value */
- private float value;
-
- /**
- * Default constructor used with reflection.
- */
- public VerifiableMessage() { }
-
- /**
- * Constructor with verifiable arguments.
- * @param superstep Superstep this message was created on.
- * @param sourceVertexId Who send this message.
- * @param value A value associated with this message.
- */
- public VerifiableMessage(
- long superstep, long sourceVertexId, float value) {
- this.superstep = superstep;
- this.sourceVertexId = sourceVertexId;
- this.value = value;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- superstep = input.readLong();
- sourceVertexId = input.readLong();
- value = input.readFloat();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeLong(superstep);
- output.writeLong(sourceVertexId);
- output.writeFloat(value);
- }
-
- @Override
- public String toString() {
- return "(superstep=" + superstep + ",sourceVertexId=" +
- sourceVertexId + ",value=" + value + ")";
- }
- }
-
- /**
- * Send and verify messages.
- */
- public static class VerifyMessageVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable,
- VerifiableMessage> {
- /** Dynamically set number of SUPERSTEPS */
- public static final String SUPERSTEP_COUNT =
- "verifyMessageVertex.superstepCount";
- /** User can access this after the application finishes if local */
- private static long FINAL_SUM;
- /** Number of SUPERSTEPS to run (6 by default) */
- private static int SUPERSTEPS = 6;
- /** Class logger */
- private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
-
- public static long getFinalSum() {
- return FINAL_SUM;
- }
-
- /**
- * Worker context used with {@link VerifyMessageVertex}.
- */
- public static class VerifyMessageVertexWorkerContext extends
- WorkerContext {
- @Override
- public void preApplication() throws InstantiationException,
- IllegalAccessException {
- SUPERSTEPS = getContext().getConfiguration().getInt(
- SUPERSTEP_COUNT, SUPERSTEPS);
- }
-
- @Override
- public void postApplication() {
- LongWritable sumAggregatorValue =
- getAggregatedValue(LongSumAggregator.class.getName());
- FINAL_SUM = sumAggregatorValue.get();
- }
-
- @Override
- public void preSuperstep() {
- }
-
- @Override
- public void postSuperstep() { }
- }
-
- @Override
- public void compute(Iterable<VerifiableMessage> messages) {
- String sumAggregatorName = LongSumAggregator.class.getName();
- if (getSuperstep() > SUPERSTEPS) {
- voteToHalt();
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: " + getAggregatedValue(sumAggregatorName));
- }
- aggregate(sumAggregatorName, new LongWritable(getId().get()));
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: sum = " +
- this.<LongWritable>getAggregatedValue(sumAggregatorName).get() +
- " for vertex " + getId());
- }
- float msgValue = 0.0f;
- for (VerifiableMessage message : messages) {
- msgValue += message.value;
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: got msg = " + message +
- " for vertex id " + getId() +
- ", vertex value " + getValue() +
- " on superstep " + getSuperstep());
- }
- if (message.superstep != getSuperstep() - 1) {
- throw new IllegalStateException(
- "compute: Impossible to not get a messsage from " +
- "the previous superstep, current superstep = " +
- getSuperstep());
- }
- if ((message.sourceVertexId != getId().get() - 1) &&
- (getId().get() != 0)) {
- throw new IllegalStateException(
- "compute: Impossible that this message didn't come " +
- "from the previous vertex and came from " +
- message.sourceVertexId);
- }
- }
- int vertexValue = getValue().get();
- setValue(new IntWritable(vertexValue + (int) msgValue));
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getId() +
- " has value " + getValue() +
- " on superstep " + getSuperstep());
- }
- for (Edge<LongWritable, FloatWritable> edge : getEdges()) {
- FloatWritable newEdgeValue = new FloatWritable(
- edge.getValue().get() + (float) vertexValue);
- Edge<LongWritable, FloatWritable> newEdge =
- new DefaultEdge<LongWritable, FloatWritable>(
- edge.getTargetVertexId(),
- newEdgeValue);
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getId() +
- " sending edgeValue " + edge.getValue() +
- " vertexValue " + vertexValue +
- " total " + newEdgeValue +
- " to vertex " + edge.getTargetVertexId() +
- " on superstep " + getSuperstep());
- }
- addEdge(newEdge);
- sendMessage(edge.getTargetVertexId(),
- new VerifiableMessage(
- getSuperstep(), getId().get(), newEdgeValue.get()));
- }
- }
- }
-
- /**
- * Master compute associated with {@link VerifyMessageVertex}.
- * It registers required aggregators.
- */
- public static class VerifyMessageMasterCompute extends
- DefaultMasterCompute {
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java b/giraph-core/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
deleted file mode 100644
index ef58bb8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/VertexWithDoubleValueFloatEdgeTextOutputFormat.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import java.io.IOException;
-
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.io.formats.TextVertexOutputFormat;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * Simple vertex output format for weighted graphs.
- */
-public class VertexWithDoubleValueFloatEdgeTextOutputFormat extends
- TextVertexOutputFormat<LongWritable, DoubleWritable, FloatWritable> {
- @Override
- public TextVertexWriter createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- return new VertexWithDoubleValueWriter();
- }
-
- /**
- * Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
- */
- public class VertexWithDoubleValueWriter extends TextVertexWriter {
- @Override
- public void writeVertex(
- Vertex<LongWritable, DoubleWritable, FloatWritable, ?> vertex)
- throws IOException, InterruptedException {
- StringBuilder output = new StringBuilder();
- output.append(vertex.getId().get());
- output.append('\t');
- output.append(vertex.getValue().get());
- getRecordWriter().write(new Text(output.toString()), null);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/main/java/org/apache/giraph/examples/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/package-info.java b/giraph-core/src/main/java/org/apache/giraph/examples/package-info.java
deleted file mode 100644
index 3ebb72b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/examples/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of Giraph examples.
- */
-package org.apache.giraph.examples;
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index 0fe9fda..4867a50 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -21,7 +21,6 @@ package org.apache.giraph;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.utils.FileUtils;
import org.apache.giraph.zk.ZooKeeperExt;
@@ -65,6 +64,9 @@ public class BspCase implements Watcher {
/** Default path for temporary files */
static final Path DEFAULT_TEMP_DIR =
new Path(System.getProperty("java.io.tmpdir"), "_giraphTests");
+
+ public static final String READER_VERTICES =
+ "GeneratedVertexReader.reader_vertices";
/** A filter for listing parts files */
static final PathFilter PARTS_FILTER = new PathFilter() {
@@ -104,7 +106,7 @@ public class BspCase implements Watcher {
conf.setZooKeeperConfiguration(getZooKeeperList());
}
// GeneratedInputSplit will generate 5 vertices
- conf.setLong(GeneratedVertexReader.READER_VERTICES, 5);
+ conf.setLong(READER_VERTICES, 5);
// Setup pathes for temporary files
Path zookeeperDir = getTempPath("_bspZooKeeper");
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java b/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
deleted file mode 100644
index efbe320..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestAutoCheckpoint.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.SimpleCheckpointVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Unit test for automated checkpoint restarting
- */
-public class TestAutoCheckpoint extends BspCase {
-
- public TestAutoCheckpoint() {
- super(TestAutoCheckpoint.class.getName());
- }
-
- /**
- * Run a job that requires checkpointing and will have a worker crash
- * and still recover from a previous checkpoint.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testSingleFault()
- throws IOException, InterruptedException, ClassNotFoundException {
- if (!runningInDistributedMode()) {
- System.out.println(
- "testSingleFault: Ignore this test in local mode.");
- return;
- }
- Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(
- SimpleCheckpointVertex.SimpleCheckpointComputation.class);
- classes.setWorkerContextClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexWorkerContext.class);
- classes.setMasterComputeClass(
- SimpleCheckpointVertex.SimpleCheckpointVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
-
- GiraphConfiguration conf = job.getConfiguration();
- conf.setBoolean(SimpleCheckpointVertex.ENABLE_FAULT, true);
- conf.setInt("mapred.map.max.attempts", 4);
- // Trigger failure faster
- conf.setInt("mapred.task.timeout", 10000);
- conf.setMaxMasterSuperstepWaitMsecs(10000);
- conf.setEventWaitMsecs(1000);
- conf.setCheckpointFrequency(2);
- conf.set(GiraphConstants.CHECKPOINT_DIRECTORY,
- getTempPath("_singleFaultCheckpoints").toString());
- conf.setBoolean(GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS, false);
- conf.setInt(GiraphConstants.ZOOKEEPER_SESSION_TIMEOUT, 10000);
- conf.setInt(GiraphConstants.ZOOKEEPER_MIN_SESSION_TIMEOUT, 10000);
-
- assertTrue(job.run(true));
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ab64a4d0/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
deleted file mode 100644
index 0d6d1d0..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph;
-
-import org.apache.giraph.conf.GiraphClasses;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.examples.GeneratedVertexReader;
-import org.apache.giraph.examples.SimpleCombinerVertex;
-import org.apache.giraph.examples.SimpleFailVertex;
-import org.apache.giraph.examples.SimpleMasterComputeVertex;
-import org.apache.giraph.examples.SimpleMsgVertex;
-import org.apache.giraph.examples.SimplePageRankVertex;
-import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
-import org.apache.giraph.examples.SimpleShortestPathsVertex;
-import org.apache.giraph.combiner.SimpleSumCombiner;
-import org.apache.giraph.examples.SimpleSuperstepVertex;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.vertex.EdgeListVertex;
-import org.apache.giraph.job.GiraphJob;
-import org.apache.giraph.worker.InputSplitPathOrganizer;
-import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexOutputFormat;
-import org.apache.giraph.zk.ZooKeeperExt;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.Closeables;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
-import org.apache.hadoop.mapreduce.JobContext;
-else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-/*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
-
-/**
- * Unit test for many simple BSP applications.
- */
-public class TestBspBasic extends BspCase {
-
- public TestBspBasic() {
- super(TestBspBasic.class.getName());
- }
-
- /**
- * Just instantiate the vertex (all functions are implemented) and the
- * VertexInputFormat using reflection.
- *
- * @throws IllegalAccessException
- * @throws InstantiationException
- * @throws InterruptedException
- * @throws IOException
- * @throws InvocationTargetException
- * @throws IllegalArgumentException
- * @throws NoSuchMethodException
- * @throws SecurityException
- */
- @Test
- public void testInstantiateVertex()
- throws InstantiationException, IllegalAccessException,
- IOException, InterruptedException, IllegalArgumentException,
- InvocationTargetException, SecurityException, NoSuchMethodException {
- System.out.println("testInstantiateVertex: java.class.path=" +
- System.getProperty("java.class.path"));
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleSuperstepVertex.class);
- classes.setVertexInputFormatClass(
- SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- ImmutableClassesGiraphConfiguration configuration =
- new ImmutableClassesGiraphConfiguration(job.getConfiguration());
- Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
- configuration.createVertex();
- System.out.println("testInstantiateVertex: Got vertex " + vertex);
- VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable>
- inputFormat = configuration.createVertexInputFormat();
-/*if[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]
- List<InputSplit> splitArray =
- inputFormat.getSplits(
- new JobContext(new Configuration(), new JobID()), 1);
-else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
- List<InputSplit> splitArray =
- inputFormat.getSplits(
- new JobContextImpl(new Configuration(), new JobID()), 1);
-/*end[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
- ((Writable) splitArray.get(0)).write(outputStream);
- System.out.println("testInstantiateVertex: Example output split = " +
- byteArrayOutputStream.toString());
- }
-
- private static class NullVertex extends EdgeListVertex<
- NullWritable, NullWritable, NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException { }
- }
-
- /**
- * Test whether vertices with NullWritable for vertex value type, edge value
- * type and message value type can be instantiated.
- */
- @Test
- public void testInstantiateNullVertex() throws IOException {
- GiraphConfiguration nullConf = new GiraphConfiguration();
- nullConf.setVertexClass(NullVertex.class);
- ImmutableClassesGiraphConfiguration<
- NullWritable, NullWritable, NullWritable,
- NullWritable> immutableClassesGiraphConfiguration =
- new ImmutableClassesGiraphConfiguration<
- NullWritable, NullWritable, NullWritable, NullWritable>(
- nullConf);
- NullWritable vertexValue =
- immutableClassesGiraphConfiguration.createVertexValue();
- NullWritable edgeValue =
- immutableClassesGiraphConfiguration.createEdgeValue();
- NullWritable messageValue =
- immutableClassesGiraphConfiguration.createMessageValue();
- assertSame(vertexValue.getClass(), NullWritable.class);
- assertSame(vertexValue, edgeValue);
- assertSame(edgeValue, messageValue);
- }
-
- /**
- * Do some checks for local job runner.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testLocalJobRunnerConfig()
- throws IOException, InterruptedException, ClassNotFoundException {
- if (runningInDistributedMode()) {
- System.out.println("testLocalJobRunnerConfig: Skipping for " +
- "non-local");
- return;
- }
- GiraphClasses classes = new GiraphClasses();
- classes.setVertexClass(SimpleSuperstepVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- GiraphConfiguration conf = job.getConfiguration();
- conf.setWorkerConfiguration(5, 5, 100.0f);
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, true);
-
- try {
- job.run(true);
- fail();
- } catch (IllegalArgumentException e) {
- }
-
- conf.setBoolean(GiraphConstants.SPLIT_MASTER_WORKER, false);
- try {
- job.run(true);
- fail();
- } catch (IllegalArgumentException e) {
- }
- job.getConfiguration().setWorkerConfiguration(1, 1, 100.0f);
- job.run(true);
- }
-
- /**
- * Run a sample BSP job in JobTracker, kill a task, and make sure
- * the job fails (not enough attempts to restart)
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspFail()
- throws IOException, InterruptedException, ClassNotFoundException {
- // Allow this test only to be run on a real Hadoop setup
- if (!runningInDistributedMode()) {
- System.out.println("testBspFail: not executed for local setup.");
- return;
- }
-
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleFailVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes,
- getTempPath(getCallingMethodName()));
- job.getConfiguration().setInt("mapred.map.max.attempts", 1);
- assertTrue(!job.run(true));
- }
-
- /**
- * Run a sample BSP job locally and test supersteps.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspSuperStep()
- throws IOException, InterruptedException, ClassNotFoundException {
- String callingMethod = getCallingMethodName();
- Path outputPath = getTempPath(callingMethod);
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleSuperstepVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- classes.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- GiraphJob job = prepareJob(callingMethod, classes, outputPath);
- Configuration conf = job.getConfiguration();
- conf.setFloat(GiraphConstants.TOTAL_INPUT_SPLIT_MULTIPLIER, 2.0f);
- // GeneratedInputSplit will generate 10 vertices
- conf.setLong(GeneratedVertexReader.READER_VERTICES, 10);
- assertTrue(job.run(true));
- if (!runningInDistributedMode()) {
- FileStatus fileStatus = getSinglePartFileStatus(conf, outputPath);
- assertEquals(49l, fileStatus.getLen());
- }
- }
-
- /**
- * Run a sample BSP job locally and test messages.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspMsg()
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMsgVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- assertTrue(job.run(true));
- }
-
-
- /**
- * Run a sample BSP job locally with no vertices and make sure
- * it completes.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testEmptyVertexInputFormat()
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMsgVertex.class);
- classes.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
- assertTrue(job.run(true));
- }
-
- /**
- * Run a sample BSP job locally with combiner and checkout output value.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspCombiner()
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, IntWritable, FloatWritable, IntWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleCombinerVertex.class);
- classes.setVertexInputFormatClass(
- SimpleSuperstepVertexInputFormat.class);
- classes.setCombinerClass(SimpleSumCombiner.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- assertTrue(job.run(true));
- }
-
- /**
- * Run a test to see if the InputSplitPathOrganizer can correctly sort
- * locality information from a mocked znode of data.
- * @throws IOException
- * @throws KeeperException
- * @throws InterruptedException
- */
- @Test
- public void testInputSplitPathOrganizer()
- throws IOException, KeeperException, InterruptedException {
- final List<String> testList = new ArrayList<String>();
- Collections.addAll(testList, "remote2", "local", "remote1");
- final String localHost = "node.LOCAL.com";
- final String testListName = "test_list_parent_znode";
- // build output just as we do to store hostlists in ZNODES
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- String last = "node.test4.com\tnode.test5.com\tnode.test6.com";
- Text.writeString(dos, last);
- byte[] remote1 = baos.toByteArray();
- baos = new ByteArrayOutputStream();
- dos = new DataOutputStream(baos);
- String middle = "node.test1.com\tnode.test2.com\tnode.test3.com";
- Text.writeString(dos, middle);
- byte[] remote2 = baos.toByteArray();
- baos = new ByteArrayOutputStream();
- dos = new DataOutputStream(baos);
- String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com";
- Text.writeString(dos, first);
- byte[] local = baos.toByteArray();
- ZooKeeperExt zk = mock(ZooKeeperExt.class);
- when(zk.getChildrenExt(testListName, false, false, true)).
- thenReturn(testList);
- when(zk.getData("remote1", false, null)).thenReturn(remote1);
- when(zk.getData("remote2", false, null)).thenReturn(remote2);
- when(zk.getData("local", false, null)).thenReturn(local);
- InputSplitPathOrganizer lis =
- new InputSplitPathOrganizer(zk, testListName, localHost, true);
- final List<String> resultList = Lists.newArrayList(lis.getPathList());
- assertEquals("local", resultList.get(0));
- }
-
- /**
- * Run a sample BSP job locally and test shortest paths.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspShortestPaths()
- throws IOException, InterruptedException, ClassNotFoundException {
- Path outputPath = getTempPath(getCallingMethodName());
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleShortestPathsVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setVertexOutputFormatClass(
- JsonLongDoubleFloatDoubleVertexOutputFormat.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
- Configuration conf = job.getConfiguration();
- conf.setLong(SimpleShortestPathsVertex.SOURCE_ID, 0);
-
- assertTrue(job.run(true));
-
- int numResults = getNumResults(job.getConfiguration(), outputPath);
-
- int expectedNumResults = runningInDistributedMode() ? 15 : 5;
- assertEquals(expectedNumResults, numResults);
- }
-
- /**
- * Run a sample BSP job locally and test PageRank with AggregatorWriter.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspPageRankWithAggregatorWriter()
- throws IOException, InterruptedException, ClassNotFoundException {
- Path outputPath = getTempPath(getCallingMethodName());
-
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimplePageRankVertex.class);
- classes.setAggregatorWriterClass(TextAggregatorWriter.class);
- classes.setMasterComputeClass(
- SimplePageRankVertex.SimplePageRankVertexMasterCompute.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setVertexOutputFormatClass(
- SimplePageRankVertex.SimplePageRankVertexOutputFormat.class);
- classes.setWorkerContextClass(
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes, outputPath);
- GiraphConfiguration configuration = job.getConfiguration();
- Path aggregatorValues = getTempPath("aggregatorValues");
- configuration.setInt(TextAggregatorWriter.FREQUENCY,
- TextAggregatorWriter.ALWAYS);
- configuration.set(TextAggregatorWriter.FILENAME,
- aggregatorValues.toString());
-
- assertTrue(job.run(true));
-
- FileSystem fs = FileSystem.get(configuration);
- Path valuesFile = new Path(aggregatorValues.toString() + "_0");
-
- try {
- if (!runningInDistributedMode()) {
- double maxPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMax();
- double minPageRank =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalMin();
- long numVertices =
- SimplePageRankVertex.SimplePageRankVertexWorkerContext.getFinalSum();
- System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
- " minPageRank=" + minPageRank + " numVertices=" + numVertices);
-
- FSDataInputStream in = null;
- BufferedReader reader = null;
- try {
- Map<Integer, Double> minValues = Maps.newHashMap();
- Map<Integer, Double> maxValues = Maps.newHashMap();
- Map<Integer, Long> vertexCounts = Maps.newHashMap();
-
- in = fs.open(valuesFile);
- reader = new BufferedReader(new InputStreamReader(in,
- Charsets.UTF_8));
- String line;
- while ((line = reader.readLine()) != null) {
- String[] tokens = line.split("\t");
- int superstep = Integer.parseInt(tokens[0].split("=")[1]);
- String value = (tokens[1].split("=")[1]);
- String aggregatorName = (tokens[1].split("=")[0]);
-
- if ("min".equals(aggregatorName)) {
- minValues.put(superstep, Double.parseDouble(value));
- }
- if ("max".equals(aggregatorName)) {
- maxValues.put(superstep, Double.parseDouble(value));
- }
- if ("sum".equals(aggregatorName)) {
- vertexCounts.put(superstep, Long.parseLong(value));
- }
- }
-
- int maxSuperstep = SimplePageRankVertex.MAX_SUPERSTEPS;
- assertEquals(maxSuperstep + 2, minValues.size());
- assertEquals(maxSuperstep + 2, maxValues.size());
- assertEquals(maxSuperstep + 2, vertexCounts.size());
-
- assertEquals(maxPageRank, (double) maxValues.get(maxSuperstep), 0d);
- assertEquals(minPageRank, (double) minValues.get(maxSuperstep), 0d);
- assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep));
-
- } finally {
- Closeables.closeQuietly(in);
- Closeables.closeQuietly(reader);
- }
- }
- } finally {
- fs.delete(valuesFile, false);
- }
- }
-
- /**
- * Run a sample BSP job locally and test MasterCompute.
- *
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
- */
- @Test
- public void testBspMasterCompute()
- throws IOException, InterruptedException, ClassNotFoundException {
- GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
- classes = new GiraphClasses();
- classes.setVertexClass(SimpleMasterComputeVertex.class);
- classes.setVertexInputFormatClass(SimplePageRankVertexInputFormat.class);
- classes.setMasterComputeClass(
- SimpleMasterComputeVertex.SimpleMasterCompute.class);
- classes.setWorkerContextClass(
- SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.class);
- GiraphJob job = prepareJob(getCallingMethodName(), classes);
- assertTrue(job.run(true));
- if (!runningInDistributedMode()) {
- double finalSum =
- SimpleMasterComputeVertex.SimpleMasterComputeWorkerContext.getFinalSum();
- System.out.println("testBspMasterCompute: finalSum=" + finalSum);
- assertEquals(32.5, finalSum, 0d);
- }
- }
-}