You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [6/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VerifyMessage.java Thu Feb 16 22:12:31 2012
@@ -18,7 +18,8 @@
package org.apache.giraph.examples;
-import org.apache.giraph.graph.*;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -36,153 +37,174 @@ import java.util.Iterator;
* appropriate location and superstep.
*/
public class VerifyMessage {
- public static class VerifiableMessage implements Writable {
- /** Superstep sent on */
- public long superstep;
- /** Source vertex id */
- public long sourceVertexId;
- /** Value */
- public float value;
-
- public VerifiableMessage() {}
-
- public VerifiableMessage(
- long superstep, long sourceVertexId, float value) {
- this.superstep = superstep;
- this.sourceVertexId = sourceVertexId;
- this.value = value;
- }
+ /**
+ * Message that will be sent in {@link VerifyMessageVertex}.
+ */
+ public static class VerifiableMessage implements Writable {
+ /** Superstep sent on */
+ private long superstep;
+ /** Source vertex id */
+ private long sourceVertexId;
+ /** Value */
+ private float value;
+
+ /**
+ * Default constructor used with reflection.
+ */
+ public VerifiableMessage() { }
+
+ /**
+ * Constructor with verifiable arguments.
+ * @param superstep Superstep this message was created on.
+ * @param sourceVertexId Who send this message.
+ * @param value A value associated with this message.
+ */
+ public VerifiableMessage(
+ long superstep, long sourceVertexId, float value) {
+ this.superstep = superstep;
+ this.sourceVertexId = sourceVertexId;
+ this.value = value;
+ }
- @Override
- public void readFields(DataInput input) throws IOException {
- superstep = input.readLong();
- sourceVertexId = input.readLong();
- value = input.readFloat();
- }
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ superstep = input.readLong();
+ sourceVertexId = input.readLong();
+ value = input.readFloat();
+ }
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeLong(superstep);
- output.writeLong(sourceVertexId);
- output.writeFloat(value);
- }
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeLong(superstep);
+ output.writeLong(sourceVertexId);
+ output.writeFloat(value);
+ }
- @Override
- public String toString() {
- return "(superstep=" + superstep + ",sourceVertexId=" +
- sourceVertexId + ",value=" + value + ")";
- }
+ @Override
+ public String toString() {
+ return "(superstep=" + superstep + ",sourceVertexId=" +
+ sourceVertexId + ",value=" + value + ")";
}
+ }
- public static class VerifyMessageVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable,
- VerifiableMessage> {
- /** User can access this after the application finishes if local */
- public static long finalSum;
- /** Number of supersteps to run (6 by default) */
- private static int supersteps = 6;
- /** Class logger */
- private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
-
- /** Dynamically set number of supersteps */
- public static final String SUPERSTEP_COUNT =
- "verifyMessageVertex.superstepCount";
-
- public static class VerifyMessageVertexWorkerContext extends
- WorkerContext {
- @Override
- public void preApplication() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- sumAggregator.setAggregatedValue(new LongWritable(0));
- supersteps = getContext().getConfiguration().getInt(
- SUPERSTEP_COUNT, supersteps);
- }
-
- @Override
- public void postApplication() {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- finalSum = sumAggregator.getAggregatedValue().get();
- }
-
- @Override
- public void preSuperstep() {
- useAggregator(LongSumAggregator.class.getName());
- }
+ /**
+ * Send and verify messages.
+ */
+ public static class VerifyMessageVertex extends
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable,
+ VerifiableMessage> {
+ /** Dynamically set number of SUPERSTEPS */
+ public static final String SUPERSTEP_COUNT =
+ "verifyMessageVertex.superstepCount";
+ /** User can access this after the application finishes if local */
+ private static long FINAL_SUM;
+ /** Number of SUPERSTEPS to run (6 by default) */
+ private static int SUPERSTEPS = 6;
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(VerifyMessageVertex.class);
- @Override
- public void postSuperstep() {}
- }
+ public static long getFinalSum() {
+ return FINAL_SUM;
+ }
- @Override
- public void compute(Iterator<VerifiableMessage> msgIterator) {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- if (getSuperstep() > supersteps) {
- voteToHalt();
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: " + sumAggregator);
- }
- sumAggregator.aggregate(getVertexId().get());
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: sum = " +
- sumAggregator.getAggregatedValue().get() +
- " for vertex " + getVertexId());
- }
- float msgValue = 0.0f;
- while (msgIterator.hasNext()) {
- VerifiableMessage msg = msgIterator.next();
- msgValue += msg.value;
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: got msg = " + msg +
- " for vertex id " + getVertexId() +
- ", vertex value " + getVertexValue() +
- " on superstep " + getSuperstep());
- }
- if (msg.superstep != getSuperstep() - 1) {
- throw new IllegalStateException(
- "compute: Impossible to not get a messsage from " +
- "the previous superstep, current superstep = " +
- getSuperstep());
- }
- if ((msg.sourceVertexId != getVertexId().get() - 1) &&
- (getVertexId().get() != 0)) {
- throw new IllegalStateException(
- "compute: Impossible that this message didn't come " +
- "from the previous vertex and came from " +
- msg.sourceVertexId);
- }
- }
- int vertexValue = getVertexValue().get();
- setVertexValue(new IntWritable(vertexValue + (int) msgValue));
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getVertexId() +
- " has value " + getVertexValue() +
- " on superstep " + getSuperstep());
- }
- for (LongWritable targetVertexId : this) {
- FloatWritable edgeValue = getEdgeValue(targetVertexId);
- if (LOG.isDebugEnabled()) {
- LOG.debug("compute: vertex " + getVertexId() +
- " sending edgeValue " + edgeValue +
- " vertexValue " + vertexValue +
- " total " +
- (edgeValue.get() + (float) vertexValue) +
- " to vertex " + targetVertexId +
- " on superstep " + getSuperstep());
- }
- edgeValue.set(edgeValue.get() + (float) vertexValue);
- addEdge(targetVertexId, edgeValue);
- sendMsg(targetVertexId,
- new VerifiableMessage(
- getSuperstep(), getVertexId().get(), edgeValue.get()));
- }
- }
+ /**
+ * Worker context used with {@link VerifyMessageVertex}.
+ */
+ public static class VerifyMessageVertexWorkerContext extends
+ WorkerContext {
+ @Override
+ public void preApplication() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ sumAggregator.setAggregatedValue(new LongWritable(0));
+ SUPERSTEPS = getContext().getConfiguration().getInt(
+ SUPERSTEP_COUNT, SUPERSTEPS);
+ }
+
+ @Override
+ public void postApplication() {
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ FINAL_SUM = sumAggregator.getAggregatedValue().get();
+ }
+
+ @Override
+ public void preSuperstep() {
+ useAggregator(LongSumAggregator.class.getName());
+ }
+
+ @Override
+ public void postSuperstep() { }
+ }
+
+ @Override
+ public void compute(Iterator<VerifiableMessage> msgIterator) {
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ if (getSuperstep() > SUPERSTEPS) {
+ voteToHalt();
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: " + sumAggregator);
+ }
+ sumAggregator.aggregate(getVertexId().get());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: sum = " +
+ sumAggregator.getAggregatedValue().get() +
+ " for vertex " + getVertexId());
+ }
+ float msgValue = 0.0f;
+ while (msgIterator.hasNext()) {
+ VerifiableMessage msg = msgIterator.next();
+ msgValue += msg.value;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: got msg = " + msg +
+ " for vertex id " + getVertexId() +
+ ", vertex value " + getVertexValue() +
+ " on superstep " + getSuperstep());
+ }
+ if (msg.superstep != getSuperstep() - 1) {
+ throw new IllegalStateException(
+ "compute: Impossible to not get a messsage from " +
+ "the previous superstep, current superstep = " +
+ getSuperstep());
+ }
+ if ((msg.sourceVertexId != getVertexId().get() - 1) &&
+ (getVertexId().get() != 0)) {
+ throw new IllegalStateException(
+ "compute: Impossible that this message didn't come " +
+ "from the previous vertex and came from " +
+ msg.sourceVertexId);
+ }
+ }
+ int vertexValue = getVertexValue().get();
+ setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: vertex " + getVertexId() +
+ " has value " + getVertexValue() +
+ " on superstep " + getSuperstep());
+ }
+ for (LongWritable targetVertexId : this) {
+ FloatWritable edgeValue = getEdgeValue(targetVertexId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("compute: vertex " + getVertexId() +
+ " sending edgeValue " + edgeValue +
+ " vertexValue " + vertexValue +
+ " total " +
+ (edgeValue.get() + (float) vertexValue) +
+ " to vertex " + targetVertexId +
+ " on superstep " + getSuperstep());
+ }
+ edgeValue.set(edgeValue.get() + (float) vertexValue);
+ addEdge(targetVertexId, edgeValue);
+ sendMsg(targetVertexId,
+ new VerifiableMessage(
+ getSuperstep(), getVertexId().get(), edgeValue.get()));
+ }
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/VertexWithComponentTextOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.giraph.examples;
@@ -33,39 +33,44 @@ import java.io.IOException;
* Text-based {@link org.apache.giraph.graph.VertexOutputFormat} for usage with
* {@link ConnectedComponentsVertex}
*
- * Each line consists of a vertex and its associated component (represented by the smallest
- * vertex id in the component)
+ * Each line consists of a vertex and its associated component (represented
+ * by the smallest vertex id in the component)
*/
public class VertexWithComponentTextOutputFormat extends
- TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> {
-
- @Override
- public VertexWriter<IntWritable, IntWritable, NullWritable>
- createVertexWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
- RecordWriter<Text, Text> recordWriter =
- textOutputFormat.getRecordWriter(context);
- return new VertexWithComponentWriter(recordWriter);
+ TextVertexOutputFormat<IntWritable, IntWritable, NullWritable> {
+ @Override
+ public VertexWriter<IntWritable, IntWritable, NullWritable>
+ createVertexWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ RecordWriter<Text, Text> recordWriter =
+ textOutputFormat.getRecordWriter(context);
+ return new VertexWithComponentWriter(recordWriter);
+ }
+
+ /**
+ * Vertex writer used with {@link VertexWithComponentTextOutputFormat}.
+ */
+ public static class VertexWithComponentWriter extends
+ TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable,
+ NullWritable> {
+ /**
+ * Constructor with record writer.
+ *
+ * @param writer Where the vertices will finally be written.
+ */
+ public VertexWithComponentWriter(RecordWriter<Text, Text> writer) {
+ super(writer);
}
- public static class VertexWithComponentWriter extends
- TextVertexOutputFormat.TextVertexWriter<IntWritable, IntWritable,
- NullWritable> {
-
- public VertexWithComponentWriter(RecordWriter<Text, Text> writer) {
- super(writer);
- }
-
- @Override
- public void writeVertex(BasicVertex<IntWritable, IntWritable,
- NullWritable,?> vertex) throws IOException,
- InterruptedException {
- StringBuilder output = new StringBuilder();
- output.append(vertex.getVertexId().get());
- output.append('\t');
- output.append(vertex.getVertexValue().get());
- getRecordWriter().write(new Text(output.toString()), null);
- }
-
+ @Override
+ public void writeVertex(BasicVertex<IntWritable, IntWritable,
+ NullWritable, ?> vertex) throws IOException,
+ InterruptedException {
+ StringBuilder output = new StringBuilder();
+ output.append(vertex.getVertexId().get());
+ output.append('\t');
+ output.append(vertex.getVertexValue().get());
+ getRecordWriter().write(new Text(output.toString()), null);
}
-}
\ No newline at end of file
+ }
+}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of Giraph examples.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.examples;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Aggregator.java Thu Feb 16 22:12:31 2012
@@ -24,39 +24,39 @@ import org.apache.hadoop.io.Writable;
* Interface for Aggregator. Allows aggregate operations for all vertices
* in a given superstep.
*
- * @param <A extends Writable> Aggregated value
+ * @param <A> Aggregated value
*/
public interface Aggregator<A extends Writable> {
- /**
- * Add a new value.
- * Needs to be commutative and associative
- *
- * @param value
- */
- void aggregate(A value);
+ /**
+ * Add a new value.
+ * Needs to be commutative and associative
+ *
+ * @param value Value to be aggregated.
+ */
+ void aggregate(A value);
- /**
- * Set aggregated value.
- * Can be used for initialization or reset.
- *
- * @param value
- */
- void setAggregatedValue(A value);
+ /**
+ * Set aggregated value.
+ * Can be used for initialization or reset.
+ *
+ * @param value Value to be set.
+ */
+ void setAggregatedValue(A value);
- /**
- * Return current aggregated value.
- * Needs to be initialized if aggregate or setAggregatedValue
- * have not been called before.
- *
- * @return A
- */
- A getAggregatedValue();
+ /**
+ * Return current aggregated value.
+ * Needs to be initialized if aggregate or setAggregatedValue
+ * have not been called before.
+ *
+ * @return Aggregated
+ */
+ A getAggregatedValue();
- /**
- * Return new aggregated value.
- * Must be changeable without affecting internals of Aggregator
- *
- * @return Writable
- */
- A createAggregatedValue();
+ /**
+ * Return new aggregated value.
+ * Must be changeable without affecting internals of Aggregator
+ *
+ * @return Writable
+ */
+ A createAggregatedValue();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorUsage.java Thu Feb 16 22:12:31 2012
@@ -24,34 +24,35 @@ import org.apache.hadoop.io.Writable;
* Vertex classes can use this interface to register and use aggregators
*/
public interface AggregatorUsage {
- /**
- * Register an aggregator in preSuperstep() and/or preApplication().
- *
- * @param name of aggregator
- * @param aggregatorClass Class type of the aggregator
- * @return created Aggregator or null when already registered
- */
- public <A extends Writable> Aggregator<A> registerAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException;
+ /**
+ * Register an aggregator in preSuperstep() and/or preApplication().
+ *
+ * @param <A> Aggregator type
+ * @param name of aggregator
+ * @param aggregatorClass Class type of the aggregator
+ * @return created Aggregator or null when already registered
+ */
+ <A extends Writable> Aggregator<A> registerAggregator(
+ String name,
+ Class<? extends Aggregator<A>> aggregatorClass)
+ throws InstantiationException, IllegalAccessException;
- /**
- * Get a registered aggregator.
- *
- * @param name Name of aggregator
- * @return Aggregator<A> (null when not registered)
- */
- public Aggregator<? extends Writable> getAggregator(String name);
+ /**
+ * Get a registered aggregator.
+ *
+ * @param name Name of aggregator
+ * @return Aggregator (null when not registered)
+ */
+ Aggregator<? extends Writable> getAggregator(String name);
- /**
- * Use a registered aggregator in current superstep.
- * Even when the same aggregator should be used in the next
- * superstep, useAggregator needs to be called at the beginning
- * of that superstep in preSuperstep().
- *
- * @param name Name of aggregator
- * @return boolean (false when not registered)
- */
- public boolean useAggregator(String name);
+ /**
+ * Use a registered aggregator in current superstep.
+ * Even when the same aggregator should be used in the next
+ * superstep, useAggregator needs to be called at the beginning
+ * of that superstep in preSuperstep().
+ *
+ * @param name Name of aggregator
+ * @return boolean (false when not registered)
+ */
+ boolean useAggregator(String name);
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/AggregatorWriter.java Thu Feb 16 22:12:31 2012
@@ -25,49 +25,49 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper.Context;
/**
- * An AggregatorWriter is used to export Aggregators during or at the end of
+ * An AggregatorWriter is used to export Aggregators during or at the end of
* each computation. It runs on the master and it's called at the end of each
- * superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is
- * passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the
+ * superstep. The special signal {@link AggregatorWriter#LAST_SUPERSTEP} is
+ * passed to {@link AggregatorWriter#writeAggregator(Map, long)} as the
* superstep value to signal the end of computation.
*/
public interface AggregatorWriter {
- /** Signal for last superstep */
- public static final int LAST_SUPERSTEP = -1;
+ /** Signal for last superstep */
+ int LAST_SUPERSTEP = -1;
- /**
- * The method is called at the initialization of the AggregatorWriter.
- * More precisely, the aggregatorWriter is initialized each time a new
- * master is elected.
- *
- * @param context Mapper Context where the master is running on
- * @param applicationAttempt ID of the applicationAttempt, used to
- * disambiguate aggregator writes for different attempts
- * @throws IOException
- */
- @SuppressWarnings("rawtypes")
- void initialize(Context context, long applicationAttempt) throws IOException;
+ /**
+ * The method is called at the initialization of the AggregatorWriter.
+ * More precisely, the aggregatorWriter is initialized each time a new
+ * master is elected.
+ *
+ * @param context Mapper Context where the master is running on
+ * @param applicationAttempt ID of the applicationAttempt, used to
+ * disambiguate aggregator writes for different attempts
+ * @throws IOException
+ */
+ @SuppressWarnings("rawtypes")
+ void initialize(Context context, long applicationAttempt) throws IOException;
- /**
- * The method is called at the end of each superstep. The user might decide
- * whether to write the aggregators values for the current superstep. For
- * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
- *
- * @param aggregatorMap Map of aggregators to write
- * @param superstep Current superstep
- * @throws IOException
- */
- void writeAggregator(
- Map<String, Aggregator<Writable>> aggregatorMap,
- long superstep) throws IOException;
+ /**
+ * The method is called at the end of each superstep. The user might decide
+ * whether to write the aggregators values for the current superstep. For
+ * the last superstep, {@link AggregatorWriter#LAST_SUPERSTEP} is passed.
+ *
+ * @param aggregatorMap Map of aggregators to write
+ * @param superstep Current superstep
+ * @throws IOException
+ */
+ void writeAggregator(
+ Map<String, Aggregator<Writable>> aggregatorMap,
+ long superstep) throws IOException;
- /**
- * The method is called at the end of a successful computation. The method
- * is not called when the job fails and a new master is elected. For this
- * reason it's advised to flush data at the end of
- * {@link AggregatorWriter#writeAggregator(Map, long)}.
- *
- * @throws IOException
- */
- void close() throws IOException;
+ /**
+ * The method is called at the end of a successful computation. The method
+ * is not called when the job fails and a new master is elected. For this
+ * reason it's advised to flush data at the end of
+ * {@link AggregatorWriter#writeAggregator(Map, long)}.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertex.java Thu Feb 16 22:12:31 2012
@@ -1,4 +1,4 @@
- /*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,249 +28,257 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
- /**
+/**
* Basic interface for writing a BSP application for computation.
*
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- * @param <M> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class BasicVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements AggregatorUsage, Iterable<I>, Writable, Configurable {
- /** Global graph state **/
- private GraphState<I,V,E,M> graphState;
- /** Configuration */
- private Configuration conf;
- /** If true, do not do anymore computation on this vertex. */
- boolean halt = false;
-
- /**
- * This method must be called after instantiation of a vertex with BspUtils
- * unless deserialization from readFields() is called.
- *
- * @param vertexId Will be the vertex id
- * @param vertexValue Will be the vertex value
- * @param edges A map of destination edge ids to edge values (can be null)
- * @param messages Initial messages for this vertex (can be null)
- */
- public abstract void initialize(
- I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages);
-
- /**
- * Must be defined by user to do computation on a single Vertex.
- *
- * @param msgIterator Iterator to the messages that were sent to this
- * vertex in the previous superstep
- * @throws IOException
- */
- public abstract void compute(Iterator<M> msgIterator) throws IOException;
-
- /**
- * Retrieves the current superstep.
- *
- * @return Current superstep
- */
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- /**
- * Get the vertex id
- */
- public abstract I getVertexId();
-
- /**
- * Get the vertex value (data stored with vertex)
- *
- * @return Vertex value
- */
- public abstract V getVertexValue();
-
- /**
- * Set the vertex data (immediately visible in the computation)
- *
- * @param vertexValue Vertex data to be set
- */
- public abstract void setVertexValue(V vertexValue);
-
- /**
- * Get the total (all workers) number of vertices that
- * existed in the previous superstep.
- *
- * @return Total number of vertices (-1 if first superstep)
- */
- public long getNumVertices() {
- return getGraphState().getNumVertices();
- }
-
- /**
- * Get the total (all workers) number of edges that
- * existed in the previous superstep.
- *
- * @return Total number of edges (-1 if first superstep)
- */
- public long getNumEdges() {
- return getGraphState().getNumEdges();
- }
-
- /**
- * Get a read-only view of the out-edges of this vertex.
- *
- * @return the out edges (sort order determined by subclass implementation).
- */
- @Override
- public abstract Iterator<I> iterator();
-
- /**
- * Get the edge value associated with a target vertex id.
- *
- * @param targetVertexId Target vertex id to check
- *
- * @return the value of the edge to targetVertexId (or null if there
- * is no edge to it)
- */
- public abstract E getEdgeValue(I targetVertexId);
-
- /**
- * Does an edge with the target vertex id exist?
- *
- * @param targetVertexId Target vertex id to check
- * @return true if there is an edge to the target
- */
- public abstract boolean hasEdge(I targetVertexId);
-
- /**
- * Get the number of outgoing edges on this vertex.
- *
- * @return the total number of outbound edges from this vertex
- */
- public abstract int getNumOutEdges();
-
- /**
- * Send a message to a vertex id. The message should not be mutated after
- * this method returns or else undefined results could occur.
- *
- * @param id Vertex id to send the message to
- * @param msg Message data to send. Note that after the message is sent,
- * the user should not modify the object.
- */
- public void sendMsg(I id, M msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsg: Cannot send null message to " + id);
- }
- getGraphState().getWorkerCommunications().
- sendMessageReq(id, msg);
- }
-
- /**
- * Send a message to all edges.
- */
- public abstract void sendMsgToAllEdges(M msg);
-
- /**
- * After this is called, the compute() code will no longer be called for
- * this vertex unless a message is sent to it. Then the compute() code
- * will be called once again until this function is called. The
- * application finishes only when all vertices vote to halt.
- */
- public void voteToHalt() {
- halt = true;
- }
-
- /**
- * Is this vertex done?
- */
- public boolean isHalted() {
- return halt;
- }
-
- /**
- * Get the list of incoming messages from the previous superstep. Same as
- * the message iterator passed to compute().
- */
- public abstract Iterable<M> getMessages();
-
- /**
- * Copy the messages this vertex should process in the current superstep
- *
- * @param messages the messages sent to this vertex in the previous superstep
- */
- abstract void putMessages(Iterable<M> messages);
-
- /**
- * Release unnecessary resources (will be called after vertex returns from
- * {@link #compute()})
- */
- abstract void releaseResources();
-
- /**
- * Get the graph state for all workers.
- *
- * @return Graph state for all workers
- */
- GraphState<I, V, E, M> getGraphState() {
- return graphState;
- }
-
- /**
- * Set the graph state for all workers
- *
- * @param graphState Graph state for all workers
- */
- void setGraphState(GraphState<I, V, E, M> graphState) {
- this.graphState = graphState;
- }
-
- /**
- * Get the mapper context
- *
- * @return Mapper context
- */
- public Mapper.Context getContext() {
- return getGraphState().getContext();
- }
-
- /**
- * Get the worker context
- *
- * @return WorkerContext context
- */
- public WorkerContext getWorkerContext() {
- return getGraphState().getGraphMapper().getWorkerContext();
- }
-
- @Override
- public final <A extends Writable> Aggregator<A> registerAggregator(
- String name,
- Class<? extends Aggregator<A>> aggregatorClass)
- throws InstantiationException, IllegalAccessException {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- registerAggregator(name, aggregatorClass);
- }
-
- @Override
- public final Aggregator<? extends Writable> getAggregator(String name) {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- getAggregator(name);
- }
-
- @Override
- public final boolean useAggregator(String name) {
- return getGraphState().getGraphMapper().getAggregatorUsage().
- useAggregator(name);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ implements AggregatorUsage, Iterable<I>, Writable, Configurable {
+ /** If true, do not do anymore computation on this vertex. */
+ protected boolean halt = false;
+ /** Global graph state **/
+ private GraphState<I, V, E, M> graphState;
+ /** Configuration */
+ private Configuration conf;
+
+
+ /**
+ * This method must be called after instantiation of a vertex with BspUtils
+ * unless deserialization from readFields() is called.
+ *
+ * @param vertexId Will be the vertex id
+ * @param vertexValue Will be the vertex value
+ * @param edges A map of destination edge ids to edge values (can be null)
+ * @param messages Initial messages for this vertex (can be null)
+ */
+ public abstract void initialize(
+ I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages);
+
+ /**
+ * Must be defined by user to do computation on a single Vertex.
+ *
+ * @param msgIterator Iterator to the messages that were sent to this
+ * vertex in the previous superstep
+ * @throws IOException
+ */
+ public abstract void compute(Iterator<M> msgIterator) throws IOException;
+
+ /**
+ * Retrieves the current superstep.
+ *
+ * @return Current superstep
+ */
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
+
+ /**
+ * Get the vertex id.
+ *
+ * @return My vertex id.
+ */
+ public abstract I getVertexId();
+
+ /**
+ * Get the vertex value (data stored with vertex)
+ *
+ * @return Vertex value
+ */
+ public abstract V getVertexValue();
+
+ /**
+ * Set the vertex data (immediately visible in the computation)
+ *
+ * @param vertexValue Vertex data to be set
+ */
+ public abstract void setVertexValue(V vertexValue);
+
+ /**
+ * Get the total (all workers) number of vertices that
+ * existed in the previous superstep.
+ *
+ * @return Total number of vertices (-1 if first superstep)
+ */
+ public long getNumVertices() {
+ return getGraphState().getNumVertices();
+ }
+
+ /**
+ * Get the total (all workers) number of edges that
+ * existed in the previous superstep.
+ *
+ * @return Total number of edges (-1 if first superstep)
+ */
+ public long getNumEdges() {
+ return getGraphState().getNumEdges();
+ }
+
+ /**
+ * Get a read-only view of the out-edges of this vertex.
+ *
+ * @return the out edges (sort order determined by subclass implementation).
+ */
+ @Override
+ public abstract Iterator<I> iterator();
+
+ /**
+ * Get the edge value associated with a target vertex id.
+ *
+ * @param targetVertexId Target vertex id to check
+ *
+ * @return the value of the edge to targetVertexId (or null if there
+ * is no edge to it)
+ */
+ public abstract E getEdgeValue(I targetVertexId);
+
+ /**
+ * Does an edge with the target vertex id exist?
+ *
+ * @param targetVertexId Target vertex id to check
+ * @return true if there is an edge to the target
+ */
+ public abstract boolean hasEdge(I targetVertexId);
+
+ /**
+ * Get the number of outgoing edges on this vertex.
+ *
+ * @return the total number of outbound edges from this vertex
+ */
+ public abstract int getNumOutEdges();
+
+ /**
+ * Send a message to a vertex id. The message should not be mutated after
+ * this method returns or else undefined results could occur.
+ *
+ * @param id Vertex id to send the message to
+ * @param msg Message data to send. Note that after the message is sent,
+ * the user should not modify the object.
+ */
+ public void sendMsg(I id, M msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsg: Cannot send null message to " + id);
+ }
+ getGraphState().getWorkerCommunications().
+ sendMessageReq(id, msg);
+ }
+
+ /**
+ * Send a message to all edges.
+ *
+ * @param msg Message sent to all edges.
+ */
+ public abstract void sendMsgToAllEdges(M msg);
+
+ /**
+ * After this is called, the compute() code will no longer be called for
+ * this vertex unless a message is sent to it. Then the compute() code
+ * will be called once again until this function is called. The
+ * application finishes only when all vertices vote to halt.
+ */
+ public void voteToHalt() {
+ halt = true;
+ }
+
+ /**
+ * Is this vertex done?
+ *
+ * @return True if halted, false otherwise.
+ */
+ public boolean isHalted() {
+ return halt;
+ }
+
+ /**
+ * Get the list of incoming messages from the previous superstep. Same as
+ * the message iterator passed to compute().
+ *
+ * @return Iterator of messages.
+ */
+ public abstract Iterable<M> getMessages();
+
+ /**
+ * Copy the messages this vertex should process in the current superstep
+ *
+ * @param messages the messages sent to this vertex in the previous superstep
+ */
+ abstract void putMessages(Iterable<M> messages);
+
+ /**
+ * Release unnecessary resources (will be called after vertex returns from
+ * {@link #compute()})
+ */
+ abstract void releaseResources();
+
+ /**
+ * Get the graph state for all workers.
+ *
+ * @return Graph state for all workers
+ */
+ GraphState<I, V, E, M> getGraphState() {
+ return graphState;
+ }
+
+ /**
+ * Set the graph state for all workers
+ *
+ * @param graphState Graph state for all workers
+ */
+ void setGraphState(GraphState<I, V, E, M> graphState) {
+ this.graphState = graphState;
+ }
+
+ /**
+ * Get the mapper context
+ *
+ * @return Mapper context
+ */
+ public Mapper.Context getContext() {
+ return getGraphState().getContext();
+ }
+
+ /**
+ * Get the worker context
+ *
+ * @return WorkerContext context
+ */
+ public WorkerContext getWorkerContext() {
+ return getGraphState().getGraphMapper().getWorkerContext();
+ }
+
+ @Override
+ public final <A extends Writable> Aggregator<A> registerAggregator(
+ String name, Class<? extends Aggregator<A>> aggregatorClass)
+ throws InstantiationException, IllegalAccessException {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ registerAggregator(name, aggregatorClass);
+ }
+
+ @Override
+ public final Aggregator<? extends Writable> getAggregator(String name) {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ getAggregator(name);
+ }
+
+ @Override
+ public final boolean useAggregator(String name) {
+ return getGraphState().getGraphMapper().getAggregatorUsage().
+ useAggregator(name);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Thu Feb 16 22:12:31 2012
@@ -24,36 +24,38 @@ import org.apache.hadoop.io.WritableComp
/**
* Handles all the situations that can arise upon creation/removal of
* vertices and edges.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public interface BasicVertexResolver<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> {
- /**
- * A vertex may have been removed, created zero or more times and had
- * zero or more messages sent to it. This method will handle all situations
- * excluding the normal case (a vertex already exists and has zero or more
- * messages sent it to).
- *
- * @param vertexId Vertex id (can be used for {@link BasicVertex}'s
- * initialize())
- * @param vertex Original vertex or null if none
- * @param vertexChanges Changes that happened to this vertex or null if none
- * @param messages messages received in the last superstep or null if none
- * @return Vertex to be returned, if null, and a vertex currently exists
- * it will be removed
- */
- BasicVertex<I, V, E, M> resolve(I vertexId,
- BasicVertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges,
- Iterable<M> messages);
+public interface BasicVertexResolver<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * A vertex may have been removed, created zero or more times and had
+ * zero or more messages sent to it. This method will handle all situations
+ * excluding the normal case (a vertex already exists and has zero or more
+ * messages sent it to).
+ *
+ * @param vertexId Vertex id (can be used for {@link BasicVertex}'s
+ * initialize())
+ * @param vertex Original vertex or null if none
+ * @param vertexChanges Changes that happened to this vertex or null if none
+ * @param messages messages received in the last superstep or null if none
+ * @return Vertex to be returned, if null, and a vertex currently exists
+ * it will be removed
+ */
+ BasicVertex<I, V, E, M> resolve(I vertexId,
+ BasicVertex<I, V, E, M> vertex,
+ VertexChanges<I, V, E, M> vertexChanges,
+ Iterable<M> messages);
- /**
- * Create a default vertex that can be used to return from resolve().
- *
- * @return Newly instantiated vertex.
- */
- BasicVertex<I, V, E, M> instantiateVertex();
+ /**
+ * Create a default vertex that can be used to return from resolve().
+ *
+ * @return Newly instantiated vertex.
+ */
+ BasicVertex<I, V, E, M> instantiateVertex();
}