You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:22:12 UTC
[29/47] git commit: updated refs/heads/release-1.1 to 4c139ee
GIRAPH-931: Provide a Strongly Connected Components algorithm (gianluca via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/de0efb07
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/de0efb07
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/de0efb07
Branch: refs/heads/release-1.1
Commit: de0efb07518082439b9a5cccd503270b09f40e84
Parents: 5adca63
Author: Maja Kabiljo <ma...@fb.com>
Authored: Tue Aug 26 12:09:54 2014 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Tue Aug 26 12:11:00 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
giraph-examples/pom.xml | 4 +
.../giraph/examples/scc/SccComputation.java | 213 +++++++++++++++++++
.../scc/SccLongLongNullTextInputFormat.java | 90 ++++++++
.../examples/scc/SccPhaseMasterCompute.java | 136 ++++++++++++
.../giraph/examples/scc/SccVertexValue.java | 157 ++++++++++++++
.../giraph/examples/scc/package-info.java | 21 ++
.../scc/SccComputationTestInMemory.java | 128 +++++++++++
8 files changed, 751 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index b64ce2c..d5b284e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-931: Provide a Strongly Connected Components algorithm (gianluca via majakabiljo)
+
GIRAPH-933: Checkpointing improvements (edunov via majakabiljo)
GIRAPH-943: Perf regression due to netty 4.0.21 (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 90f6889..f8304a1 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -415,6 +415,10 @@ under the License.
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil</artifactId>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java
new file mode 100644
index 0000000..ca194f6
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples.scc;
+
+import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.PHASE;
+import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.NEW_MAXIMUM;
+import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.CONVERGED;
+
+import java.io.IOException;
+
+import org.apache.giraph.Algorithm;
+import org.apache.giraph.examples.scc.SccPhaseMasterCompute.Phases;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Finds strongly connected components of the graph.
+ */
+@Algorithm(name = "Strongly Connected Components",
+ description = "Finds strongly connected components of the graph")
+public class SccComputation extends
+ BasicComputation<LongWritable, SccVertexValue, NullWritable, LongWritable> {
+
+ /**
+ * Current phase of the computation as defined in SccPhaseMasterCompute
+ */
+ private Phases currPhase;
+
+ /**
+ * Reusable object to encapsulate message value, in order to avoid
+ * creating a new instance every time a message is sent.
+ */
+ private LongWritable messageValue = new LongWritable();
+
+ /**
+ * Reusable object to encapsulate a parent vertex id.
+ */
+ private LongWritable parentId = new LongWritable();
+
+ @Override
+ public void preSuperstep() {
+ IntWritable phaseInt = getAggregatedValue(PHASE);
+ currPhase = SccPhaseMasterCompute.getPhase(phaseInt);
+ }
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
+ Iterable<LongWritable> messages) throws IOException {
+
+ SccVertexValue vertexValue = vertex.getValue();
+
+ if (!vertexValue.isActive()) {
+ vertex.voteToHalt();
+ return;
+ }
+
+ switch (currPhase) {
+ case TRANSPOSE :
+ vertexValue.clearParents();
+ sendMessageToAllEdges(vertex, vertex.getId());
+ break;
+ case TRIMMING :
+ trim(vertex, messages);
+ break;
+ case FORWARD_TRAVERSAL :
+ forwardTraversal(vertex, messages);
+ break;
+ case BACKWARD_TRAVERSAL_START :
+ backwardTraversalStart(vertex);
+ break;
+ case BACKWARD_TRAVERSAL_REST :
+ backwardTraversalRest(vertex, messages);
+ break;
+ default :
+ break;
+ }
+
+ }
+
+ /**
+ * Creates list of parents based on the received ids and halts the vertices
+ * that don't have any parent or outgoing edge, hence, they can't be
+ * part of an SCC.
+ * @param vertex Current vertex.
+ * @param messages Received ids from the Transpose phase.
+ */
+ private void trim(Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
+ Iterable<LongWritable> messages) {
+ SccVertexValue vertexValue = vertex.getValue();
+ // Keep the ids of the parent nodes to allow for backwards traversal
+ for (LongWritable parent : messages) {
+ vertexValue.addParent(parent.get());
+ }
+ // If this node doesn't have any parents or outgoing edges,
+ // it can't be part of an SCC
+ vertexValue.set(vertex.getId().get());
+ if (vertex.getNumEdges() == 0 || vertexValue.getParents() == null) {
+ vertexValue.deactivate();
+ } else {
+ messageValue.set(vertexValue.get());
+ sendMessageToAllEdges(vertex, messageValue);
+ }
+ }
+
+ /**
+ * Traverse the graph through outgoing edges and keep the maximum vertex
+ * value.
+ * If a new maximum value is found, propagate it until convergence.
+ * @param vertex Current vertex.
+ * @param messages Received values from neighbor vertices.
+ */
+ private void forwardTraversal(
+ Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
+ Iterable<LongWritable> messages) {
+ SccVertexValue vertexValue = vertex.getValue();
+ boolean changed = setMaxValue(vertexValue, messages);
+ if (changed) {
+ messageValue.set(vertexValue.get());
+ sendMessageToAllEdges(vertex, messageValue);
+ aggregate(NEW_MAXIMUM, new BooleanWritable(true));
+ }
+ }
+
+ /**
+ * Traverse the transposed graph and keep the maximum vertex value.
+ * @param vertex Current vertex.
+ */
+ private void backwardTraversalStart(
+ Vertex<LongWritable, SccVertexValue, NullWritable> vertex) {
+ SccVertexValue vertexValue = vertex.getValue();
+ if (vertexValue.get() == vertex.getId().get()) {
+ messageValue.set(vertexValue.get());
+ sendMessageToAllParents(vertex, messageValue);
+ }
+ }
+
+ /**
+ * Traverse the transposed graph and keep the maximum vertex value.
+ * @param vertex Current vertex.
+ * @param messages Received values from children vertices.
+ */
+ private void backwardTraversalRest(
+ Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
+ Iterable<LongWritable> messages) {
+ SccVertexValue vertexValue = vertex.getValue();
+ for (LongWritable m : messages) {
+ if (vertexValue.get() == m.get()) {
+ sendMessageToAllParents(vertex, m);
+ aggregate(CONVERGED, new BooleanWritable(true));
+ vertexValue.deactivate();
+ vertex.voteToHalt();
+ break;
+ }
+ }
+ }
+
+ /**
+ * Compares the messages values with the current vertex value and finds
+ * the maximum.
+ * If the maximum value is different from the vertex value, makes it the
+ * new vertex value and returns true, otherwise, returns false.
+ * @param vertexValue Current vertex value.
+ * @param messages Messages containing neighbors' vertex values.
+ * @return True if a new maximum was found, otherwise, returns false.
+ */
+ private boolean setMaxValue(SccVertexValue vertexValue,
+ Iterable<LongWritable> messages) {
+ boolean changed = false;
+ for (LongWritable m : messages) {
+ if (vertexValue.get() < m.get()) {
+ vertexValue.set(m.get());
+ changed = true;
+ }
+ }
+ return changed;
+ }
+
+
+ /**
+ * Send message to all parents.
+ * @param vertex Current vertex.
+ * @param message Message to be sent.
+ */
+ private void sendMessageToAllParents(
+ Vertex<LongWritable, SccVertexValue, NullWritable> vertex,
+ LongWritable message) {
+ for (Long id : vertex.getValue().getParents()) {
+ parentId.set(id);
+ sendMessage(parentId, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java
new file mode 100644
index 0000000..e5a4c86
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples.scc;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.io.formats.TextVertexInputFormat;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with long ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class SccLongLongNullTextInputFormat extends
+ TextVertexInputFormat<LongWritable, SccVertexValue, NullWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new LongLongNullVertexReader();
+ }
+
+ /**
+ * Vertex reader associated with {@link LongLongNullLongTextInputFormat}.
+ */
+ public class LongLongNullVertexReader extends
+ TextVertexReaderFromEachLineProcessed<String[]> {
+ /** Cached vertex id for the current line */
+ private LongWritable id;
+
+ @Override
+ protected String[] preprocessLine(Text line) throws IOException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ id = new LongWritable(Long.parseLong(tokens[0]));
+ return tokens;
+ }
+
+ @Override
+ protected LongWritable getId(String[] tokens) throws IOException {
+ return id;
+ }
+
+ @Override
+ protected SccVertexValue getValue(String[] tokens) throws IOException {
+ return new SccVertexValue(Long.parseLong(tokens[0]));
+ }
+
+ @Override
+ protected Iterable<Edge<LongWritable, NullWritable>> getEdges(
+ String[] tokens) throws IOException {
+ List<Edge<LongWritable, NullWritable>> edges =
+ Lists.newArrayListWithCapacity(tokens.length - 1);
+ for (int n = 1; n < tokens.length; n++) {
+ edges.add(EdgeFactory.create(
+ new LongWritable(Long.parseLong(tokens[n]))));
+ }
+ return edges;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java
new file mode 100644
index 0000000..f5fd82b
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples.scc;
+
+import org.apache.giraph.aggregators.BooleanOverwriteAggregator;
+import org.apache.giraph.aggregators.IntOverwriteAggregator;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+
+/**
+ * This master compute keeps track of what phase is being currently executed by
+ * the Strongly Connected Components computation. The phases comprehend the
+ * following: 1 - Transpose (comprehends 2 supersteps, one to propagate parent
+ * vertices ids and another one to store them by their respective children) 2 -
+ * Trimming (this phase can happen multiple times) 3 - Forward Traversal 4 -
+ * Backward Traversal
+ */
+public class SccPhaseMasterCompute extends DefaultMasterCompute {
+
+ /**
+ * Aggregator that stores the current phase
+ */
+ public static final String PHASE = "scccompute.phase";
+
+ /**
+ * Flags whether a new maximum was found in the Forward Traversal phase
+ */
+ public static final String NEW_MAXIMUM = "scccompute.max";
+
+ /**
+ * Flags whether a vertex converged in the Backward Traversal phase
+ */
+ public static final String CONVERGED = "scccompute.converged";
+
+ /**
+ * Enumerates the possible phases of the algorithm.
+ */
+ public enum Phases {
+ /** Tranpose and Trimming phases **/
+ TRANSPOSE, TRIMMING,
+ /** Maximum id propagation **/
+ FORWARD_TRAVERSAL,
+ /** Vertex convergence in SCC **/
+ BACKWARD_TRAVERSAL_START, BACKWARD_TRAVERSAL_REST
+ };
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerPersistentAggregator(PHASE, IntOverwriteAggregator.class);
+ registerAggregator(NEW_MAXIMUM, BooleanOverwriteAggregator.class);
+ registerAggregator(CONVERGED, BooleanOverwriteAggregator.class);
+ }
+
+ @Override
+ public void compute() {
+ if (getSuperstep() == 0) {
+ setPhase(Phases.TRANSPOSE);
+ } else {
+ Phases currPhase = getPhase();
+ switch (currPhase) {
+ case TRANSPOSE:
+ setPhase(Phases.TRIMMING);
+ break;
+ case TRIMMING :
+ setPhase(Phases.FORWARD_TRAVERSAL);
+ break;
+ case FORWARD_TRAVERSAL :
+ BooleanWritable newMaxFound = getAggregatedValue(NEW_MAXIMUM);
+ // If no new maximum value was found it means the propagation
+ // converged, so we can move to the next phase
+ if (!newMaxFound.get()) {
+ setPhase(Phases.BACKWARD_TRAVERSAL_START);
+ }
+ break;
+ case BACKWARD_TRAVERSAL_START :
+ setPhase(Phases.BACKWARD_TRAVERSAL_REST);
+ break;
+ case BACKWARD_TRAVERSAL_REST :
+ BooleanWritable converged = getAggregatedValue(CONVERGED);
+ if (!converged.get()) {
+ setPhase(Phases.TRANSPOSE);
+ }
+ break;
+ default :
+ break;
+ }
+ }
+ }
+
+ /**
+ * Sets the next phase of the algorithm.
+ * @param phase
+ * Next phase.
+ */
+ private void setPhase(Phases phase) {
+ setAggregatedValue(PHASE, new IntWritable(phase.ordinal()));
+ }
+
+ /**
+ * Get current phase.
+ * @return Current phase as enumerator.
+ */
+ private Phases getPhase() {
+ IntWritable phaseInt = getAggregatedValue(PHASE);
+ return getPhase(phaseInt);
+ }
+
+ /**
+ * Helper function to convert from internal aggregated value to a Phases
+ * enumerator.
+ * @param phaseInt
+ * An integer that matches a position in the Phases enumerator.
+ * @return A Phases' item for the given position.
+ */
+ public static Phases getPhase(IntWritable phaseInt) {
+ return Phases.values()[phaseInt.get()];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java
new file mode 100644
index 0000000..63c23c5
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples.scc;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Vertex value for the Strongly Connected Components algorithm. It keeps track
+ * of the parents of the vertex in order to traverse the graph backwards.
+ */
+public class SccVertexValue implements Writable {
+
+ /** Vertex's parents **/
+ private LongArrayList parents;
+
+ /** Current vertex value **/
+ private long value = Long.MIN_VALUE;
+
+ /** Indicates whether the vertex was trimmed, hence,
+ * it can't be part of the computation anymore.
+ */
+ private boolean active = true;
+
+ /**
+ * Public constructor required for serialization.
+ */
+ public SccVertexValue() {
+ }
+
+ /**
+ * Constructor
+ * @param value Initial value for this vertex.
+ */
+ public SccVertexValue(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ value = in.readLong();
+
+ int size = in.readInt();
+ if (size != 0) {
+ for (int i = 0; i < size; i++) {
+ addParent(in.readLong());
+ }
+ }
+
+ active = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(value);
+
+ int size = parents == null ? 0 : parents.size();
+ out.writeInt(size);
+ if (size != 0) {
+ for (long incomingId : parents) {
+ out.writeLong(incomingId);
+ }
+ }
+
+ out.writeBoolean(active);
+ }
+
+ /**
+ * Returns the list of parent vertices, i.e., vertices that are at the other
+ * end of incoming edges. If the vertex doesn't have any incoming edge, it
+ * returns null.
+ * @return List of the vertex's parents.
+ */
+ public LongArrayList getParents() {
+ return parents;
+ }
+
+ /**
+ * Adds a vertex id to the list of parent vertices.
+ * @param vertexId It of the parent vertex.
+ */
+ public void addParent(long vertexId) {
+ // Initialize the list of parent vertices only when one attempts to add
+ // the first item, so we save some memory on vertices that have no incoming
+ // edges
+ if (parents == null) {
+ parents = new LongArrayList();
+ }
+ parents.add(vertexId);
+ }
+
+ /**
+ * Clear parents list.
+ */
+ public void clearParents() {
+ parents = null;
+ }
+
+ /**
+ * Sets the vertex value. At the end of the SCC computation, vertices with the
+ * same vertex value are part of the same component.
+ * @param value Vertex value.
+ */
+ public void set(long value) {
+ this.value = value;
+ }
+
+ /**
+ * Returns the vertex value. At the end of the SCC computation, vertices with
+ * the same vertex value are part of the same component.
+ * @return Current vertex value.
+ */
+ public long get() {
+ return value;
+ }
+
+ /**
+ * Remove this vertex from the computation.
+ */
+ public void deactivate() {
+ this.active = false;
+ }
+
+ /**
+ * Indicates whether the vertex was removed in a Trimming phase.
+ * @return True if the vertex was trimmed, otherwise, return false.
+ */
+ public boolean isActive() {
+ return active;
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java
new file mode 100644
index 0000000..70e345a
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Classes for Strongly Connected Components computation.
+ */
+package org.apache.giraph.examples.scc;
http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
new file mode 100644
index 0000000..833c43e
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples.scc;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.giraph.utils.TestGraph;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link SccComputation}
+ */
+public class SccComputationTestInMemory {
+ @SuppressWarnings("unchecked")
+ public static Entry<LongWritable, NullWritable>[] makeEdges(long... args) {
+ Entry<LongWritable, NullWritable> result[] = new Entry[args.length];
+ for (int i = 0; i < args.length; i++) {
+ result[i] = new SimpleEntry<LongWritable, NullWritable>(new LongWritable(
+ args[i]), NullWritable.get());
+ }
+ return result;
+ }
+
+ /**
+ * Connects the {@outgoingVertices} to the given vertex id
+ * with null-valued edges.
+ *
+ * @param graph
+ * @param id
+ * @param outgoingVertices
+ */
+ public static void addVertex(
+ TestGraph<LongWritable, SccVertexValue, NullWritable> graph, long id,
+ long... outgoingVertices) {
+ graph.addVertex(new LongWritable(id), new SccVertexValue(id),
+ makeEdges(outgoingVertices));
+ }
+
+ @Test
+ public void testToyData() throws Exception {
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setComputationClass(SccComputation.class);
+ conf.setMasterComputeClass(SccPhaseMasterCompute.class);
+ conf.setOutEdgesClass(ByteArrayEdges.class);
+
+
+ TestGraph<LongWritable, SccVertexValue, NullWritable> graph = new TestGraph<LongWritable, SccVertexValue, NullWritable>(
+ conf);
+
+ addVertex(graph, 0, 1, 2, 4);
+ addVertex(graph, 1, 3, 20);
+ addVertex(graph, 2, 3);
+ addVertex(graph, 3, 0);
+ addVertex(graph, 20, 21);
+ addVertex(graph, 21, 22);
+ addVertex(graph, 22, 23);
+ addVertex(graph, 23, 24);
+ addVertex(graph, 24, 25);
+ addVertex(graph, 25, 20);
+ addVertex(graph, 4, 5);
+ addVertex(graph, 5, 6);
+
+ TestGraph<LongWritable, SccVertexValue, NullWritable> results = InternalVertexRunner.runWithInMemoryOutput(conf, graph);
+
+ Map<Long, List<Long>> scc = parse(results);
+
+ List<Long> components = scc.get(3l);
+ Collections.sort(components);
+ Assert.assertEquals(Arrays.asList(0l, 1l, 2l, 3l), components);
+
+ Assert.assertEquals(Arrays.asList(4l), scc.get(4l));
+ Assert.assertEquals(Arrays.asList(5l), scc.get(5l));
+ Assert.assertEquals(Arrays.asList(6l), scc.get(6l));
+
+ components = scc.get(25l);
+ Collections.sort(components);
+ Assert.assertEquals(Arrays.asList(20l, 21l, 22l, 23l, 24l, 25l), components);
+ }
+
+ private Map<Long, List<Long>> parse(
+ TestGraph<LongWritable, SccVertexValue, NullWritable> g) {
+ Map<Long, List<Long>> scc = new HashMap<Long, List<Long>>();
+ for (LongWritable v : g.getVertices().keySet()) {
+ Vertex<LongWritable, SccVertexValue, NullWritable> vertex = g
+ .getVertex(v);
+ long sccId = vertex.getValue().get();
+ List<Long> verticesIds = scc.get(sccId);
+ if (verticesIds == null) {// New SCC
+ List<Long> newScc = new ArrayList<Long>();
+ newScc.add(vertex.getId().get());
+ scc.put(sccId, newScc);
+ } else {
+ verticesIds.add(vertex.getId().get());
+ }
+ }
+ return scc;
+ }
+}