You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [13/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/example...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Feb 16 22:12:31 2012
@@ -34,279 +34,304 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
+/**
+ * Optimized vertex implementation for
+ * <LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ */
public abstract class LongDoubleFloatDoubleVertex extends
- MutableVertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(LongDoubleFloatDoubleVertex.class);
-
- private long vertexId;
- private double vertexValue;
- private OpenLongFloatHashMap verticesWithEdgeValues =
- new OpenLongFloatHashMap();
- private DoubleArrayList messageList = new DoubleArrayList();
-
- @Override
- public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
- Map<LongWritable, FloatWritable> edgesW,
- Iterable<DoubleWritable> messagesW) {
- if (vertexIdW != null ) {
- vertexId = vertexIdW.get();
- }
- if (vertexValueW != null) {
- vertexValue = vertexValueW.get();
- }
- if (edgesW != null) {
- for (Map.Entry<LongWritable, FloatWritable> entry :
- edgesW.entrySet()) {
- verticesWithEdgeValues.put(entry.getKey().get(),
- entry.getValue().get());
- }
- }
- if (messagesW != null) {
- for(DoubleWritable m : messagesW) {
- messageList.add(m.get());
- }
- }
- }
-
- @Override
- public final boolean addEdge(LongWritable targetId,
- FloatWritable edgeValue) {
- if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("addEdge: Vertex=" + vertexId +
- ": already added an edge value for dest vertex id " +
- targetId.get());
- }
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public FloatWritable removeEdge(LongWritable targetVertexId) {
- long target = targetVertexId.get();
- if (verticesWithEdgeValues.containsKey(target)) {
- float value = verticesWithEdgeValues.get(target);
- verticesWithEdgeValues.removeKey(target);
- return new FloatWritable(value);
- } else {
- return null;
- }
- }
-
- @Override
- public final void setVertexId(LongWritable vertexId) {
- this.vertexId = vertexId.get();
- }
-
- @Override
- public final LongWritable getVertexId() {
- // TODO: possibly not make new objects every time?
- return new LongWritable(vertexId);
- }
-
- @Override
- public final DoubleWritable getVertexValue() {
- return new DoubleWritable(vertexValue);
- }
-
- @Override
- public final void setVertexValue(DoubleWritable vertexValue) {
- this.vertexValue = vertexValue.get();
- }
-
- @Override
- public final void sendMsg(LongWritable id, DoubleWritable msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsg: Cannot send null message to " + id);
- }
- getGraphState().getWorkerCommunications().sendMessageReq(id, msg);
- }
-
- @Override
- public final void sendMsgToAllEdges(final DoubleWritable msg) {
- if (msg == null) {
- throw new IllegalArgumentException(
- "sendMsgToAllEdges: Cannot send null message to all edges");
- }
- final MutableVertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> vertex = this;
- verticesWithEdgeValues.forEachKey(new LongProcedure() {
- @Override
- public boolean apply(long destVertexId) {
- vertex.sendMsg(new LongWritable(destVertexId), msg);
- return true;
- }
- });
- }
-
- @Override
- public long getNumVertices() {
- return getGraphState().getNumVertices();
- }
-
- @Override
- public long getNumEdges() {
- return getGraphState().getNumEdges();
- }
-
- @Override
- public Iterator<LongWritable> iterator() {
- final long[] destVertices = verticesWithEdgeValues.keys().elements();
- final int destVerticesSize = verticesWithEdgeValues.size();
- return new Iterator<LongWritable>() {
- int offset = 0;
- @Override public boolean hasNext() {
- return offset < destVerticesSize;
- }
-
- @Override public LongWritable next() {
- return new LongWritable(destVertices[offset++]);
- }
-
- @Override public void remove() {
- throw new UnsupportedOperationException(
- "Mutation disallowed for edge list via iterator");
- }
- };
- }
-
- @Override
- public FloatWritable getEdgeValue(LongWritable targetVertexId) {
- return new FloatWritable(
- verticesWithEdgeValues.get(targetVertexId.get()));
- }
-
- @Override
- public boolean hasEdge(LongWritable targetVertexId) {
- return verticesWithEdgeValues.containsKey(targetVertexId.get());
- }
-
- @Override
- public int getNumOutEdges() {
- return verticesWithEdgeValues.size();
- }
-
- @Override
- public long getSuperstep() {
- return getGraphState().getSuperstep();
- }
-
- @Override
- final public void readFields(DataInput in) throws IOException {
- vertexId = in.readLong();
- vertexValue = in.readDouble();
- long edgeMapSize = in.readLong();
- for (long i = 0; i < edgeMapSize; ++i) {
- long destVertexId = in.readLong();
- float edgeValue = in.readFloat();
- verticesWithEdgeValues.put(destVertexId, edgeValue);
- }
- long msgListSize = in.readLong();
- for (long i = 0; i < msgListSize; ++i) {
- messageList.add(in.readDouble());
- }
- halt = in.readBoolean();
- }
-
- @Override
- public final void write(final DataOutput out) throws IOException {
- out.writeLong(vertexId);
- out.writeDouble(vertexValue);
- out.writeLong(verticesWithEdgeValues.size());
- verticesWithEdgeValues.forEachPair(new LongFloatProcedure() {
- @Override
- public boolean apply(long destVertexId, float edgeValue) {
- try {
- out.writeLong(destVertexId);
- out.writeFloat(edgeValue);
- } catch (IOException e) {
- throw new IllegalStateException(
- "apply: IOException when not allowed", e);
- }
- return true;
- }
- });
- out.writeLong(messageList.size());
- messageList.forEach(new DoubleProcedure() {
- @Override
- public boolean apply(double message) {
- try {
- out.writeDouble(message);
- } catch (IOException e) {
- throw new IllegalStateException(
- "apply: IOException when not allowed", e);
- }
- return true;
- }
- });
- out.writeBoolean(halt);
- }
-
- @Override
- void putMessages(Iterable<DoubleWritable> messages) {
- messageList.clear();
- for (DoubleWritable message : messages) {
- messageList.add(message.get());
- }
- }
-
- @Override
- void releaseResources() {
- // Hint to GC to free the messages
- messageList.clear();
+ MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(LongDoubleFloatDoubleVertex.class);
+ /** Long vertex id */
+ private long vertexId;
+ /** Double vertex value */
+ private double vertexValue;
+ /** Stores the edges */
+ private OpenLongFloatHashMap verticesWithEdgeValues =
+ new OpenLongFloatHashMap();
+ /** Message list storage */
+ private DoubleArrayList messageList = new DoubleArrayList();
+
+ @Override
+ public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
+ Map<LongWritable, FloatWritable> edgesW,
+ Iterable<DoubleWritable> messagesW) {
+ if (vertexIdW != null) {
+ vertexId = vertexIdW.get();
+ }
+ if (vertexValueW != null) {
+ vertexValue = vertexValueW.get();
+ }
+ if (edgesW != null) {
+ for (Map.Entry<LongWritable, FloatWritable> entry :
+ edgesW.entrySet()) {
+ verticesWithEdgeValues.put(entry.getKey().get(),
+ entry.getValue().get());
+ }
+ }
+ if (messagesW != null) {
+ for (DoubleWritable m : messagesW) {
+ messageList.add(m.get());
+ }
+ }
+ }
+
+ @Override
+ public final boolean addEdge(LongWritable targetId,
+ FloatWritable edgeValue) {
+ if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("addEdge: Vertex=" + vertexId +
+ ": already added an edge value for dest vertex id " +
+ targetId.get());
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public FloatWritable removeEdge(LongWritable targetVertexId) {
+ long target = targetVertexId.get();
+ if (verticesWithEdgeValues.containsKey(target)) {
+ float value = verticesWithEdgeValues.get(target);
+ verticesWithEdgeValues.removeKey(target);
+ return new FloatWritable(value);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public final void setVertexId(LongWritable vertexId) {
+ this.vertexId = vertexId.get();
+ }
+
+ @Override
+ public final LongWritable getVertexId() {
+ // TODO: possibly not make new objects every time?
+ return new LongWritable(vertexId);
+ }
+
+ @Override
+ public final DoubleWritable getVertexValue() {
+ return new DoubleWritable(vertexValue);
+ }
+
+ @Override
+ public final void setVertexValue(DoubleWritable vertexValue) {
+ this.vertexValue = vertexValue.get();
+ }
+
+ @Override
+ public final void sendMsg(LongWritable id, DoubleWritable msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsg: Cannot send null message to " + id);
+ }
+ getGraphState().getWorkerCommunications().sendMessageReq(id, msg);
+ }
+
+ @Override
+ public final void sendMsgToAllEdges(final DoubleWritable msg) {
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "sendMsgToAllEdges: Cannot send null message to all edges");
+ }
+ final MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> vertex = this;
+ verticesWithEdgeValues.forEachKey(new LongProcedure() {
+ @Override
+ public boolean apply(long destVertexId) {
+ vertex.sendMsg(new LongWritable(destVertexId), msg);
+ return true;
+ }
+ });
+ }
+
+ @Override
+ public long getNumVertices() {
+ return getGraphState().getNumVertices();
+ }
+
+ @Override
+ public long getNumEdges() {
+ return getGraphState().getNumEdges();
+ }
+
+ @Override
+ public Iterator<LongWritable> iterator() {
+ final long[] destVertices = verticesWithEdgeValues.keys().elements();
+ final int destVerticesSize = verticesWithEdgeValues.size();
+ return new Iterator<LongWritable>() {
+ private int offset = 0;
+ @Override public boolean hasNext() {
+ return offset < destVerticesSize;
+ }
+
+ @Override public LongWritable next() {
+ return new LongWritable(destVertices[offset++]);
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException(
+ "Mutation disallowed for edge list via iterator");
+ }
+ };
+ }
+
+ @Override
+ public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+ return new FloatWritable(
+ verticesWithEdgeValues.get(targetVertexId.get()));
+ }
+
+ @Override
+ public boolean hasEdge(LongWritable targetVertexId) {
+ return verticesWithEdgeValues.containsKey(targetVertexId.get());
+ }
+
+ @Override
+ public int getNumOutEdges() {
+ return verticesWithEdgeValues.size();
+ }
+
+ @Override
+ public long getSuperstep() {
+ return getGraphState().getSuperstep();
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ vertexId = in.readLong();
+ vertexValue = in.readDouble();
+ long edgeMapSize = in.readLong();
+ for (long i = 0; i < edgeMapSize; ++i) {
+ long destVertexId = in.readLong();
+ float edgeValue = in.readFloat();
+ verticesWithEdgeValues.put(destVertexId, edgeValue);
+ }
+ long msgListSize = in.readLong();
+ for (long i = 0; i < msgListSize; ++i) {
+ messageList.add(in.readDouble());
+ }
+ halt = in.readBoolean();
+ }
+
+ @Override
+ public final void write(final DataOutput out) throws IOException {
+ out.writeLong(vertexId);
+ out.writeDouble(vertexValue);
+ out.writeLong(verticesWithEdgeValues.size());
+ verticesWithEdgeValues.forEachPair(new LongFloatProcedure() {
+ @Override
+ public boolean apply(long destVertexId, float edgeValue) {
+ try {
+ out.writeLong(destVertexId);
+ out.writeFloat(edgeValue);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "apply: IOException when not allowed", e);
+ }
+ return true;
+ }
+ });
+ out.writeLong(messageList.size());
+ messageList.forEach(new DoubleProcedure() {
+ @Override
+ public boolean apply(double message) {
+ try {
+ out.writeDouble(message);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "apply: IOException when not allowed", e);
+ }
+ return true;
+ }
+ });
+ out.writeBoolean(halt);
+ }
+
+ @Override
+ void putMessages(Iterable<DoubleWritable> messages) {
+ messageList.clear();
+ for (DoubleWritable message : messages) {
+ messageList.add(message.get());
+ }
+ }
+
+ @Override
+ void releaseResources() {
+ // Hint to GC to free the messages
+ messageList.clear();
+ }
+
+ @Override
+ public Iterable<DoubleWritable> getMessages() {
+ return new UnmodifiableDoubleWritableIterable(messageList);
+ }
+
+ @Override
+ public String toString() {
+ return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+ ",#edges=" + getNumOutEdges() + ")";
+ }
+
+ /**
+ * Helper iterable over the messages.
+ */
+ private class UnmodifiableDoubleWritableIterable
+ implements Iterable<DoubleWritable> {
+ /** Backing store of messages */
+ private final DoubleArrayList elementList;
+
+ /**
+ * Constructor.
+ *
+ * @param elementList Backing store of element list.
+ */
+ public UnmodifiableDoubleWritableIterable(
+ DoubleArrayList elementList) {
+ this.elementList = elementList;
+ }
+
+ @Override
+ public Iterator<DoubleWritable> iterator() {
+ return new UnmodifiableDoubleWritableIterator(
+ elementList);
+ }
+ }
+
+ /**
+ * Iterator over the messages.
+ */
+ private class UnmodifiableDoubleWritableIterator
+ extends UnmodifiableIterator<DoubleWritable> {
+ /** Double backing list */
+ private final DoubleArrayList elementList;
+ /** Offset into the backing list */
+ private int offset = 0;
+
+ /**
+ * Constructor.
+ *
+ * @param elementList Backing store of element list.
+ */
+ UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
+ this.elementList = elementList;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < elementList.size();
}
@Override
- public Iterable<DoubleWritable> getMessages() {
- return new UnmodifiableDoubleWritableIterable(messageList);
- }
-
- @Override
- public String toString() {
- return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
- ",#edges=" + getNumOutEdges() + ")";
- }
-
- private class UnmodifiableDoubleWritableIterable
- implements Iterable<DoubleWritable> {
-
- private final DoubleArrayList elementList;
-
- public UnmodifiableDoubleWritableIterable(
- DoubleArrayList elementList) {
- this.elementList = elementList;
- }
-
- @Override
- public Iterator<DoubleWritable> iterator() {
- return new UnmodifiableDoubleWritableIterator(
- elementList);
- }
- }
-
- private class UnmodifiableDoubleWritableIterator
- extends UnmodifiableIterator<DoubleWritable> {
- private final DoubleArrayList elementList;
- private int offset = 0;
-
- UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
- this.elementList = elementList;
- }
-
- @Override
- public boolean hasNext() {
- return offset < elementList.size();
- }
-
- @Override
- public DoubleWritable next() {
- return new DoubleWritable(elementList.get(offset++));
- }
+ public DoubleWritable next() {
+ return new DoubleWritable(elementList.get(offset++));
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java Thu Feb 16 22:12:31 2012
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
+import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -29,152 +30,166 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
/**
* Master thread that will coordinate the activities of the tasks. It runs
* on all task processes, however, will only execute its algorithm if it knows
* it is the "leader" from ZooKeeper.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
-public class MasterThread<I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> extends Thread {
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(MasterThread.class);
- /** Reference to shared BspService */
- private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
- /** Context (for counters) */
- private final Context context;
- /** Use superstep counters? */
- private final boolean superstepCounterOn;
- /** Setup seconds */
- private double setupSecs = 0d;
- /** Superstep timer (in seconds) map */
- private final Map<Long, Double> superstepSecsMap =
- new TreeMap<Long, Double>();
-
- /** Counter group name for the Giraph timers */
- public String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
-
- /**
- * Constructor.
- *
- * @param bspServiceMaster Master that already exists and setup() has
- * been called.
- */
- MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
- Context context) {
- super(MasterThread.class.getName());
- this.bspServiceMaster = bspServiceMaster;
- this.context = context;
- superstepCounterOn = context.getConfiguration().getBoolean(
- GiraphJob.USE_SUPERSTEP_COUNTERS,
- GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
- }
+public class MasterThread<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable> extends Thread {
+ /** Counter group name for the Giraph timers */
+ public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(MasterThread.class);
+ /** Reference to shared BspService */
+ private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
+ /** Context (for counters) */
+ private final Context context;
+ /** Use superstep counters? */
+ private final boolean superstepCounterOn;
+ /** Setup seconds */
+ private double setupSecs = 0d;
+ /** Superstep timer (in seconds) map */
+ private final Map<Long, Double> superstepSecsMap =
+ new TreeMap<Long, Double>();
+
+ /**
+ * Constructor.
+ *
+ * @param bspServiceMaster Master that already exists and setup() has
+ * been called.
+ * @param context Context from the Mapper.
+ */
+ MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
+ Context context) {
+ super(MasterThread.class.getName());
+ this.bspServiceMaster = bspServiceMaster;
+ this.context = context;
+ superstepCounterOn = context.getConfiguration().getBoolean(
+ GiraphJob.USE_SUPERSTEP_COUNTERS,
+ GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
+ }
+
+ /**
+ * The master algorithm. The algorithm should be able to withstand
+ * failures and resume as necessary since the master may switch during a
+ * job.
+ */
+ @Override
+ public void run() {
+ // Algorithm:
+ // 1. Become the master
+ // 2. If desired, restart from a manual checkpoint
+ // 3. Run all supersteps until complete
+ try {
+ long startMillis = System.currentTimeMillis();
+ long endMillis = 0;
+ bspServiceMaster.setup();
+ if (bspServiceMaster.becomeMaster()) {
+ // Attempt to create InputSplits if necessary. Bail out if that fails.
+ if (bspServiceMaster.getRestartedSuperstep() !=
+ BspService.UNSET_SUPERSTEP ||
+ bspServiceMaster.createInputSplits() != -1) {
+ long setupMillis = System.currentTimeMillis() - startMillis;
+ context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+ "Setup (milliseconds)").
+ increment(setupMillis);
+ setupSecs = setupMillis / 1000.0d;
+ SuperstepState superstepState = SuperstepState.INITIAL;
+ long cachedSuperstep = BspService.UNSET_SUPERSTEP;
+ while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
+ long startSuperstepMillis = System.currentTimeMillis();
+ cachedSuperstep = bspServiceMaster.getSuperstep();
+ superstepState = bspServiceMaster.coordinateSuperstep();
+ long superstepMillis = System.currentTimeMillis() -
+ startSuperstepMillis;
+ superstepSecsMap.put(new Long(cachedSuperstep),
+ superstepMillis / 1000.0d);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("masterThread: Coordination of superstep " +
+ cachedSuperstep + " took " +
+ superstepMillis / 1000.0d +
+ " seconds ended with state " + superstepState +
+ " and is now on superstep " +
+ bspServiceMaster.getSuperstep());
+ }
+ if (superstepCounterOn) {
+ String counterPrefix;
+ if (cachedSuperstep == -1) {
+ counterPrefix = "Vertex input superstep";
+ } else {
+ counterPrefix = "Superstep " + cachedSuperstep;
+ }
+ context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+ counterPrefix +
+ " (milliseconds)").
+ increment(superstepMillis);
+ }
- /**
- * The master algorithm. The algorithm should be able to withstand
- * failures and resume as necessary since the master may switch during a
- * job.
- */
- @Override
- public void run() {
- // Algorithm:
- // 1. Become the master
- // 2. If desired, restart from a manual checkpoint
- // 3. Run all supersteps until complete
- try {
- long startMillis = System.currentTimeMillis();
- long endMillis = 0;
- bspServiceMaster.setup();
- if (bspServiceMaster.becomeMaster() == true) {
- // Attempt to create InputSplits if necessary. Bail out if that fails.
- if (bspServiceMaster.getRestartedSuperstep() != BspService.UNSET_SUPERSTEP
- || bspServiceMaster.createInputSplits() != -1) {
- long setupMillis = (System.currentTimeMillis() - startMillis);
- context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
- "Setup (milliseconds)").
- increment(setupMillis);
- setupSecs = setupMillis / 1000.0d;
- SuperstepState superstepState = SuperstepState.INITIAL;
- long cachedSuperstep = BspService.UNSET_SUPERSTEP;
- while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
- long startSuperstepMillis = System.currentTimeMillis();
- cachedSuperstep = bspServiceMaster.getSuperstep();
- superstepState = bspServiceMaster.coordinateSuperstep();
- long superstepMillis = System.currentTimeMillis() -
- startSuperstepMillis;
- superstepSecsMap.put(new Long(cachedSuperstep),
- superstepMillis / 1000.0d);
- if (LOG.isInfoEnabled()) {
- LOG.info("masterThread: Coordination of superstep " +
- cachedSuperstep + " took " +
- superstepMillis / 1000.0d +
- " seconds ended with state " + superstepState +
- " and is now on superstep " +
- bspServiceMaster.getSuperstep());
- }
- if (superstepCounterOn) {
- String counterPrefix;
- if (cachedSuperstep == -1) {
- counterPrefix = "Vertex input superstep";
- } else {
- counterPrefix = "Superstep " + cachedSuperstep;
- }
- context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
- counterPrefix +
- " (milliseconds)").
- increment(superstepMillis);
- }
-
- // If a worker failed, restart from a known good superstep
- if (superstepState == SuperstepState.WORKER_FAILURE) {
- bspServiceMaster.restartFromCheckpoint(
- bspServiceMaster.getLastGoodCheckpoint());
- }
- endMillis = System.currentTimeMillis();
- }
- bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
- }
+ // If a worker failed, restart from a known good superstep
+ if (superstepState == SuperstepState.WORKER_FAILURE) {
+ bspServiceMaster.restartFromCheckpoint(
+ bspServiceMaster.getLastGoodCheckpoint());
}
- bspServiceMaster.cleanup();
- if (!superstepSecsMap.isEmpty()) {
- context.getCounter(
- GIRAPH_TIMERS_COUNTER_GROUP_NAME,
- "Shutdown (milliseconds)").
- increment(System.currentTimeMillis() - endMillis);
- if (LOG.isInfoEnabled()) {
- LOG.info("setup: Took " + setupSecs + " seconds.");
- }
- for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
- if (LOG.isInfoEnabled()) {
- if (entry.getKey().longValue() ==
- BspService.INPUT_SUPERSTEP) {
- LOG.info("vertex input superstep: Took " +
- entry.getValue() + " seconds.");
- } else {
- LOG.info("superstep " + entry.getKey() + ": Took " +
- entry.getValue() + " seconds.");
- }
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("shutdown: Took " +
- (System.currentTimeMillis() - endMillis) /
- 1000.0d + " seconds.");
- LOG.info("total: Took " +
- ((System.currentTimeMillis() / 1000.0d) -
- setupSecs) + " seconds.");
- }
- context.getCounter(
- GIRAPH_TIMERS_COUNTER_GROUP_NAME,
- "Total (milliseconds)").
- increment(System.currentTimeMillis() - startMillis);
+ endMillis = System.currentTimeMillis();
+ }
+ bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
+ }
+ }
+ bspServiceMaster.cleanup();
+ if (!superstepSecsMap.isEmpty()) {
+ context.getCounter(
+ GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+ "Shutdown (milliseconds)").
+ increment(System.currentTimeMillis() - endMillis);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("setup: Took " + setupSecs + " seconds.");
+ }
+ for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
+ if (LOG.isInfoEnabled()) {
+ if (entry.getKey().longValue() ==
+ BspService.INPUT_SUPERSTEP) {
+ LOG.info("vertex input superstep: Took " +
+ entry.getValue() + " seconds.");
+ } else {
+ LOG.info("superstep " + entry.getKey() + ": Took " +
+ entry.getValue() + " seconds.");
}
- } catch (Exception e) {
- LOG.error("masterThread: Master algorithm failed: ", e);
- throw new RuntimeException(e);
+ }
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("shutdown: Took " +
+ (System.currentTimeMillis() - endMillis) /
+ 1000.0d + " seconds.");
+ LOG.info("total: Took " +
+ ((System.currentTimeMillis() / 1000.0d) -
+ setupSecs) + " seconds.");
}
+ context.getCounter(
+ GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+ "Total (milliseconds)").
+ increment(System.currentTimeMillis() - startMillis);
+ }
+ } catch (IOException e) {
+ LOG.error("masterThread: Master algorithm failed with " +
+ "IOException ", e);
+ throw new IllegalStateException(e);
+ } catch (InterruptedException e) {
+ LOG.error("masterThread: Master algorithm failed with " +
+ "InterruptedException", e);
+ throw new IllegalStateException(e);
+ } catch (KeeperException e) {
+ LOG.error("masterThread: Master algorithm failed with " +
+ "KeeperException", e);
+ throw new IllegalStateException(e);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Thu Feb 16 22:12:31 2012
@@ -27,98 +27,107 @@ import java.util.Map;
/**
* Interface used by VertexReader to set the properties of a new vertex
* or mutate the graph.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class MutableVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends BasicVertex<I, V, E, M> {
- /**
- * Set the vertex id
- *
- * @param id Vertex id is set to this (instantiated by the user)
- */
- public abstract void setVertexId(I id);
-
- /**
- * Add an edge for this vertex (happens immediately)
- *
- * @param targetVertexId target vertex
- * @param edgeValue value of the edge
- * @return Return true if succeeded, false otherwise
- */
- public abstract boolean addEdge(I targetVertexId, E edgeValue);
-
- /**
- * Removes an edge for this vertex (happens immediately).
- *
- * @param targetVertexId the target vertex id of the edge to be removed.
- * @return the value of the edge which was removed (or null if no
- * edge existed to targetVertexId)
- */
- public abstract E removeEdge(I targetVertexId);
-
- /**
- * Create a vertex to add to the graph. Calls initialize() for the vertex
- * as well.
- *
- * @return A new vertex for adding to the graph
- */
- public BasicVertex<I, V, E, M> instantiateVertex(
- I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
- MutableVertex<I, V, E, M> mutableVertex =
- (MutableVertex<I, V, E, M>) BspUtils
- .<I, V, E, M>createVertex(getContext().getConfiguration());
- mutableVertex.setGraphState(getGraphState());
- mutableVertex.initialize(vertexId, vertexValue, edges, messages);
- return mutableVertex;
- }
-
- /**
- * Sends a request to create a vertex that will be available during the
- * next superstep. Use instantiateVertex() to do the instantiation.
- *
- * @param vertex User created vertex
- */
- public void addVertexRequest(BasicVertex<I, V, E, M> vertex)
- throws IOException {
- getGraphState().getWorkerCommunications().
- addVertexReq(vertex);
- }
-
- /**
- * Request to remove a vertex from the graph
- * (applied just prior to the next superstep).
- *
- * @param vertexId Id of the vertex to be removed.
- */
- public void removeVertexRequest(I vertexId) throws IOException {
- getGraphState().getWorkerCommunications().
- removeVertexReq(vertexId);
- }
-
- /**
- * Request to add an edge of a vertex in the graph
- * (processed just prior to the next superstep)
- *
- * @param sourceVertexId Source vertex id of edge
- * @param edge Edge to add
- */
- public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
- throws IOException {
- getGraphState().getWorkerCommunications().
- addEdgeReq(sourceVertexId, edge);
- }
-
- /**
- * Request to remove an edge of a vertex from the graph
- * (processed just prior to the next superstep).
- *
- * @param sourceVertexId Source vertex id of edge
- * @param destVertexId Destination vertex id of edge
- */
- public void removeEdgeRequest(I sourceVertexId, I destVertexId)
- throws IOException {
- getGraphState().getWorkerCommunications().
- removeEdgeReq(sourceVertexId, destVertexId);
- }
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BasicVertex<I, V, E, M> {
+ /**
+ * Set the vertex id
+ *
+ * @param id Vertex id is set to this (instantiated by the user)
+ */
+ public abstract void setVertexId(I id);
+
+ /**
+ * Add an edge for this vertex (happens immediately)
+ *
+ * @param targetVertexId target vertex
+ * @param edgeValue value of the edge
+ * @return Return true if succeeded, false otherwise
+ */
+ public abstract boolean addEdge(I targetVertexId, E edgeValue);
+
+ /**
+ * Removes an edge for this vertex (happens immediately).
+ *
+ * @param targetVertexId the target vertex id of the edge to be removed.
+ * @return the value of the edge which was removed (or null if no
+ * edge existed to targetVertexId)
+ */
+ public abstract E removeEdge(I targetVertexId);
+
+ /**
+ * Create a vertex to add to the graph. Calls initialize() for the vertex
+ * as well.
+ *
+ * @param vertexId Id of the new vertex.
+ * @param vertexValue Value of the new vertex.
+ * @param edges Map of edges to be added to this vertex.
+ * @param messages Messages to be added to the vertex (typically empty)
+ * @return A new vertex for adding to the graph
+ */
+ public BasicVertex<I, V, E, M> instantiateVertex(
+ I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
+ MutableVertex<I, V, E, M> mutableVertex =
+ (MutableVertex<I, V, E, M>) BspUtils
+ .<I, V, E, M>createVertex(getContext().getConfiguration());
+ mutableVertex.setGraphState(getGraphState());
+ mutableVertex.initialize(vertexId, vertexValue, edges, messages);
+ return mutableVertex;
+ }
+
+ /**
+ * Sends a request to create a vertex that will be available during the
+ * next superstep. Use instantiateVertex() to do the instantiation.
+ *
+ * @param vertex User created vertex
+ */
+ public void addVertexRequest(BasicVertex<I, V, E, M> vertex)
+ throws IOException {
+ getGraphState().getWorkerCommunications().
+ addVertexReq(vertex);
+ }
+
+ /**
+ * Request to remove a vertex from the graph
+ * (applied just prior to the next superstep).
+ *
+ * @param vertexId Id of the vertex to be removed.
+ */
+ public void removeVertexRequest(I vertexId) throws IOException {
+ getGraphState().getWorkerCommunications().
+ removeVertexReq(vertexId);
+ }
+
+ /**
+ * Request to add an edge of a vertex in the graph
+ * (processed just prior to the next superstep)
+ *
+ * @param sourceVertexId Source vertex id of edge
+ * @param edge Edge to add
+ */
+ public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+ throws IOException {
+ getGraphState().getWorkerCommunications().
+ addEdgeReq(sourceVertexId, edge);
+ }
+
+ /**
+ * Request to remove an edge of a vertex from the graph
+ * (processed just prior to the next superstep).
+ *
+ * @param sourceVertexId Source vertex id of edge
+ * @param destVertexId Destination vertex id of edge
+ */
+ public void removeEdgeRequest(I sourceVertexId, I destVertexId)
+ throws IOException {
+ getGraphState().getWorkerCommunications().
+ removeEdgeReq(sourceVertexId, destVertexId);
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Thu Feb 16 22:12:31 2012
@@ -30,92 +30,98 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper.Context;
/**
- * Default implementation of {@link AggregatorWriter}. Each line consists of
- * text and contains the aggregator name, the aggregator value and the
+ * Default implementation of {@link AggregatorWriter}. Each line consists of
+ * text and contains the aggregator name, the aggregator value and the
* aggregator class.
*/
-public class TextAggregatorWriter
- implements AggregatorWriter {
- /** The filename of the outputfile */
- public static final String FILENAME =
- "giraph.textAggregatorWriter.filename";
- /** The frequency of writing:
- * - NEVER: never write, files aren't created at all
- * - AT_THE_END: aggregators are written only when the computation is over
- * - int: i.e. 1 is every superstep, 2 every two supersteps and so on
- */
- public static final String FREQUENCY =
- "giraph.textAggregatorWriter.frequency";
- private static final String DEFAULT_FILENAME = "aggregatorValues";
- /** Signal for "never write" frequency */
- public static final int NEVER = 0;
- /** Signal for "write only the final values" frequency */
- public static final int AT_THE_END = -1;
- /** Handle to the outputfile */
- protected FSDataOutputStream output;
- private int frequency;
-
- @Override
- @SuppressWarnings("rawtypes")
- public void initialize(Context context, long attempt) throws IOException {
- Configuration conf = context.getConfiguration();
- frequency = conf.getInt(FREQUENCY, NEVER);
- String filename = conf.get(FILENAME, DEFAULT_FILENAME);
- if (frequency != NEVER) {
- Path p = new Path(filename+"_"+attempt);
- FileSystem fs = FileSystem.get(conf);
- if (fs.exists(p)) {
- throw new RuntimeException("aggregatorWriter file already" +
- " exists: " + p.getName());
- }
- output = fs.create(p);
- }
- }
+public class TextAggregatorWriter implements AggregatorWriter {
+ /** The filename of the outputfile */
+ public static final String FILENAME =
+ "giraph.textAggregatorWriter.filename";
+ /** Signal for "never write" frequency */
+ public static final int NEVER = 0;
+ /** Signal for "write only the final values" frequency */
+ public static final int AT_THE_END = -1;
+ /** The frequency of writing:
+ * - NEVER: never write, files aren't created at all
+ * - AT_THE_END: aggregators are written only when the computation is over
+ * - int: i.e. 1 is every superstep, 2 every two supersteps and so on
+ */
+ public static final String FREQUENCY =
+ "giraph.textAggregatorWriter.frequency";
+ /** Default filename for dumping aggregator values */
+ private static final String DEFAULT_FILENAME = "aggregatorValues";
+ /** Handle to the outputfile */
+ protected FSDataOutputStream output;
+ /** Write every "frequency" supersteps */
+ private int frequency;
- @Override
- final public void writeAggregator(
- Map<String, Aggregator<Writable>> aggregators,
- long superstep) throws IOException {
-
- if (shouldWrite(superstep)) {
- for (Entry<String, Aggregator<Writable>> a:
- aggregators.entrySet()) {
- output.writeUTF(aggregatorToString(a.getKey(),
- a.getValue(),
- superstep));
- }
- output.flush();
- }
- }
-
- /**
- * Implements the way an aggregator is converted into a String.
- * Override this if you want to implement your own text format.
- *
- * @param aggregatorName Name of the aggregator
- * @param a Aggregator
- * @param superstep Current superstep
- * @return The String representation for the aggregator
- */
- protected String aggregatorToString(String aggregatorName,
- Aggregator<Writable> a,
- long superstep) {
-
- return new StringBuilder("superstep=").append(superstep).append("\t")
- .append(aggregatorName).append("=").append(a.getAggregatedValue())
- .append("\t").append(a.getClass().getCanonicalName()).append("\n")
- .toString();
+ @Override
+ @SuppressWarnings("rawtypes")
+ public void initialize(Context context, long attempt) throws IOException {
+ Configuration conf = context.getConfiguration();
+ frequency = conf.getInt(FREQUENCY, NEVER);
+ String filename = conf.get(FILENAME, DEFAULT_FILENAME);
+ if (frequency != NEVER) {
+ Path p = new Path(filename + "_" + attempt);
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(p)) {
+ throw new RuntimeException("aggregatorWriter file already" +
+ " exists: " + p.getName());
+ }
+ output = fs.create(p);
}
+ }
- private boolean shouldWrite(long superstep) {
- return ((frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
- (frequency != NEVER && superstep % frequency == 0));
+ @Override
+ public final void writeAggregator(
+ Map<String, Aggregator<Writable>> aggregators,
+ long superstep) throws IOException {
+ if (shouldWrite(superstep)) {
+ for (Entry<String, Aggregator<Writable>> a:
+ aggregators.entrySet()) {
+ output.writeUTF(aggregatorToString(a.getKey(),
+ a.getValue(),
+ superstep));
+ }
+ output.flush();
}
+ }
+
+ /**
+ * Implements the way an aggregator is converted into a String.
+ * Override this if you want to implement your own text format.
+ *
+ * @param aggregatorName Name of the aggregator
+ * @param a Aggregator
+ * @param superstep Current superstep
+ * @return The String representation for the aggregator
+ */
+ protected String aggregatorToString(String aggregatorName,
+ Aggregator<Writable> a,
+ long superstep) {
+
+ return new StringBuilder("superstep=").append(superstep).append("\t")
+ .append(aggregatorName).append("=").append(a.getAggregatedValue())
+ .append("\t").append(a.getClass().getCanonicalName()).append("\n")
+ .toString();
+ }
+
+ /**
+ * Should write this superstep?
+ *
+ * @param superstep Superstep to check
+ * @return True if should write, false otherwise
+ */
+ private boolean shouldWrite(long superstep) {
+ return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
+ (frequency != NEVER && superstep % frequency == 0);
+ }
- @Override
- public void close() throws IOException {
- if (output != null) {
- output.close();
- }
+ @Override
+ public void close() throws IOException {
+ if (output != null) {
+ output.close();
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java Thu Feb 16 22:12:31 2012
@@ -33,40 +33,36 @@ import org.apache.hadoop.io.WritableComp
* @param <M> Message value
*/
@SuppressWarnings("rawtypes")
-public interface VertexChanges<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> {
+public interface VertexChanges<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Get the added vertices for this particular vertex index from the previous
+ * superstep.
+ *
+ * @return List of vertices for this vertex index.
+ */
+ List<BasicVertex<I, V, E, M>> getAddedVertexList();
- /**
- * Get the added vertices for this particular vertex index from the previous
- * superstep.
- *
- * @return List of vertices for this vertex index.
- */
- List<BasicVertex<I, V, E, M>> getAddedVertexList();
+ /**
+ * Get the number of times this vertex was removed in the previous
+ * superstep.
+ *
+ * @return Count of time this vertex was removed in the previous superstep
+ */
+ int getRemovedVertexCount();
- /**
- * Get the number of times this vertex was removed in the previous
- * superstep.
- *
- * @return Count of time this vertex was removed in the previous superstep
- */
- int getRemovedVertexCount();
+ /**
+ * Get the added edges for this particular vertex index from the previous
+ * superstep
+ *
+ * @return List of added edges for this vertex index
+ */
+ List<Edge<I, E>> getAddedEdgeList();
- /**
- * Get the added edges for this particular vertex index from the previous
- * superstep
- *
- * @return List of added edges for this vertex index
- */
- List<Edge<I, E>> getAddedEdgeList();
-
- /**
- * Get the removed edges by their destination vertex index.
- *
- * @return List of destination edges for removal from this vertex index
- */
- List<I> getRemovedEdgeList();
+ /**
+ * Get the removed edges by their destination vertex index.
+ *
+ * @return List of destination edges for removal from this vertex index
+ */
+ List<I> getRemovedEdgeList();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java Thu Feb 16 22:12:31 2012
@@ -26,23 +26,22 @@ import org.apache.hadoop.io.WritableComp
/**
* Abstract class to extend for combining of messages sent to the same vertex.
*
- * @param <I extends Writable> index
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class VertexCombiner<I extends WritableComparable,
- M extends Writable> {
-
- /**
- * Combines message values for a particular vertex index.
- *
- * @param vertexIndex Index of the vertex getting these messages
- * @param messages Iterable of the messages to be combined
- * @return Iterable of the combined messages. The returned value cannot
- * be null and its size is required to be smaller or equal to
- * the size of the messages list.
- * @throws IOException
- */
- public abstract Iterable<M> combine(I vertexIndex,
- Iterable<M> messages) throws IOException;
+ M extends Writable> {
+ /**
+ * Combines message values for a particular vertex index.
+ *
+ * @param vertexIndex Index of the vertex getting these messages
+ * @param messages Iterable of the messages to be combined
+ * @return Iterable of the combined messages. The returned value cannot
+ * be null and its size is required to be smaller or equal to
+ * the size of the messages list.
+ * @throws IOException
+ */
+ public abstract Iterable<M> combine(I vertexIndex,
+ Iterable<M> messages) throws IOException;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java Thu Feb 16 22:12:31 2012
@@ -22,45 +22,67 @@ package org.apache.giraph.graph;
* Simple immutable structure for storing a final vertex and edge count.
*/
public class VertexEdgeCount {
- /** Immutable vertices */
- private final long vertexCount;
- /** Immutable edges */
- private final long edgeCount;
-
- public VertexEdgeCount() {
- vertexCount = 0;
- edgeCount = 0;
- }
-
- public VertexEdgeCount(long vertexCount, long edgeCount) {
- this.vertexCount = vertexCount;
- this.edgeCount = edgeCount;
- }
-
- public long getVertexCount() {
- return vertexCount;
- }
-
- public long getEdgeCount() {
- return edgeCount;
- }
-
- public VertexEdgeCount incrVertexEdgeCount(
- VertexEdgeCount vertexEdgeCount) {
- return new VertexEdgeCount(
- vertexCount + vertexEdgeCount.getVertexCount(),
- edgeCount + vertexEdgeCount.getEdgeCount());
- }
-
- public VertexEdgeCount incrVertexEdgeCount(
- long vertexCount, long edgeCount) {
- return new VertexEdgeCount(
- this.vertexCount + vertexCount,
- this.edgeCount + edgeCount);
- }
-
- @Override
- public String toString() {
- return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
- }
+ /** Immutable vertices */
+ private final long vertexCount;
+ /** Immutable edges */
+ private final long edgeCount;
+
+ /**
+ * Default constructor.
+ */
+ public VertexEdgeCount() {
+ vertexCount = 0;
+ edgeCount = 0;
+ }
+
+ /**
+ * Constructor with initial values.
+ *
+ * @param vertexCount Final number of vertices.
+ * @param edgeCount Final number of edges.
+ */
+ public VertexEdgeCount(long vertexCount, long edgeCount) {
+ this.vertexCount = vertexCount;
+ this.edgeCount = edgeCount;
+ }
+
+ public long getVertexCount() {
+ return vertexCount;
+ }
+
+ public long getEdgeCount() {
+ return edgeCount;
+ }
+
+ /**
+ * Increment the both the vertex edge count with a {@link VertexEdgeCount}.
+ *
+ * @param vertexEdgeCount add both the vertices and edges of this object.
+ * @return New immutable object with the new vertex and edge counts.
+ */
+ public VertexEdgeCount incrVertexEdgeCount(
+ VertexEdgeCount vertexEdgeCount) {
+ return new VertexEdgeCount(
+ vertexCount + vertexEdgeCount.getVertexCount(),
+ edgeCount + vertexEdgeCount.getEdgeCount());
+ }
+
+ /**
+ * Increment the both the vertex edge count with primitives.
+ *
+ * @param vertexCount Add this many vertices.
+ * @param edgeCount Add this many edges.
+ * @return New immutable object with the new vertex and edge counts.
+ */
+ public VertexEdgeCount incrVertexEdgeCount(
+ long vertexCount, long edgeCount) {
+ return new VertexEdgeCount(
+ this.vertexCount + vertexCount,
+ this.edgeCount + edgeCount);
+ }
+
+ @Override
+ public String toString() {
+ return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -36,45 +36,45 @@ import java.util.List;
* @param <I> Vertex id
* @param <V> Vertex value
* @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class VertexInputFormat<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Logically split the vertices for a graph processing application.
+ *
+ * Each {@link InputSplit} is then assigned to a worker for processing.
+ *
+ * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+ * input files are not physically split into chunks. For e.g. a split could
+ * be <i><input-file-path, start, offset></i> tuple. The InputFormat
+ * also creates the {@link VertexReader} to read the {@link InputSplit}.
+ *
+ * Also, the number of workers is a hint given to the developer to try to
+ * intelligently determine how many splits to create (if this is
+ * adjustable) at runtime.
+ *
+ * @param context Context of the job
+ * @param numWorkers Number of workers used for this job
+ * @return an array of {@link InputSplit}s for the job.
+ */
+ public abstract List<InputSplit> getSplits(
+ JobContext context, int numWorkers)
+ throws IOException, InterruptedException;
- /**
- * Logically split the vertices for a graph processing application.
- *
- * Each {@link InputSplit} is then assigned to a worker for processing.
- *
- * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
- * input files are not physically split into chunks. For e.g. a split could
- * be <i><input-file-path, start, offset></i> tuple. The InputFormat
- * also creates the {@link VertexReader} to read the {@link InputSplit}.
- *
- * Also, the number of workers is a hint given to the developer to try to
- * intelligently determine how many splits to create (if this is
- * adjustable) at runtime.
- *
- * @param context Context of the job
- * @param numWorkers Number of workers used for this job
- * @return an array of {@link InputSplit}s for the job.
- */
- public abstract List<InputSplit> getSplits(
- JobContext context, int numWorkers)
- throws IOException, InterruptedException;
-
- /**
- * Create a vertex reader for a given split. The framework will call
- * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
- * the split is used.
- *
- * @param split the split to be read
- * @param context the information about the task
- * @return a new record reader
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract VertexReader<I, V, E, M> createVertexReader(
- InputSplit split,
- TaskAttemptContext context) throws IOException;
+ /**
+ * Create a vertex reader for a given split. The framework will call
+ * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param split the split to be read
+ * @param context the information about the task
+ * @return a new record reader
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VertexReader<I, V, E, M> createVertexReader(
+ InputSplit split,
+ TaskAttemptContext context) throws IOException;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java Thu Feb 16 22:12:31 2012
@@ -36,87 +36,85 @@ import org.json.JSONObject;
* @param <M> Message value
*/
@SuppressWarnings("rawtypes")
-public class VertexMutations<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> implements VertexChanges<I, V, E, M> {
- /** List of added vertices during the last superstep */
- private final List<BasicVertex<I, V, E, M>> addedVertexList =
- new ArrayList<BasicVertex<I, V, E, M>>();
- /** Count of remove vertex requests */
- private int removedVertexCount = 0;
- /** List of added edges */
- private final List<Edge<I, E>> addedEdgeList = new ArrayList<Edge<I, E>>();
- /** List of removed edges */
- private final List<I> removedEdgeList = new ArrayList<I>();
-
- @Override
- public List<BasicVertex<I, V, E, M>> getAddedVertexList() {
- return addedVertexList;
- }
-
- /**
- * Add a vertex mutation
- *
- * @param vertex Vertex to be added
- */
- public void addVertex(BasicVertex<I, V, E, M> vertex) {
- addedVertexList.add(vertex);
- }
-
- @Override
- public int getRemovedVertexCount() {
- return removedVertexCount;
- }
-
- /**
- * Removed a vertex mutation (increments a count)
- */
- public void removeVertex() {
- ++removedVertexCount;
- }
-
- @Override
- public List<Edge<I, E>> getAddedEdgeList() {
- return addedEdgeList;
- }
-
- /**
- * Add an edge to this vertex
- *
- * @param edge Edge to be added
- */
- public void addEdge(Edge<I, E> edge) {
- addedEdgeList.add(edge);
- }
-
- @Override
- public List<I> getRemovedEdgeList() {
- return removedEdgeList;
- }
-
- /**
- * Remove an edge on this vertex
- *
- * @param destinationVertexId Vertex index of the destination of the edge
- */
- public void removeEdge(I destinationVertexId) {
- removedEdgeList.add(destinationVertexId);
- }
-
- @Override
- public String toString() {
- JSONObject jsonObject = new JSONObject();
- try {
- jsonObject.put("added vertices", getAddedVertexList().toString());
- jsonObject.put("added edges", getAddedEdgeList().toString());
- jsonObject.put("removed vertex count", getRemovedVertexCount());
- jsonObject.put("removed edges", getRemovedEdgeList().toString());
- return jsonObject.toString();
- } catch (JSONException e) {
- throw new IllegalStateException("toString: Got a JSON exception",
- e);
- }
+public class VertexMutations<I extends WritableComparable,
+ V extends Writable, E extends Writable,
+ M extends Writable> implements VertexChanges<I, V, E, M> {
+ /** List of added vertices during the last superstep */
+ private final List<BasicVertex<I, V, E, M>> addedVertexList =
+ new ArrayList<BasicVertex<I, V, E, M>>();
+ /** Count of remove vertex requests */
+ private int removedVertexCount = 0;
+ /** List of added edges */
+ private final List<Edge<I, E>> addedEdgeList = new ArrayList<Edge<I, E>>();
+ /** List of removed edges */
+ private final List<I> removedEdgeList = new ArrayList<I>();
+
+ @Override
+ public List<BasicVertex<I, V, E, M>> getAddedVertexList() {
+ return addedVertexList;
+ }
+
+ /**
+ * Add a vertex mutation
+ *
+ * @param vertex Vertex to be added
+ */
+ public void addVertex(BasicVertex<I, V, E, M> vertex) {
+ addedVertexList.add(vertex);
+ }
+
+ @Override
+ public int getRemovedVertexCount() {
+ return removedVertexCount;
+ }
+
+ /**
+ * Removed a vertex mutation (increments a count)
+ */
+ public void removeVertex() {
+ ++removedVertexCount;
+ }
+
+ @Override
+ public List<Edge<I, E>> getAddedEdgeList() {
+ return addedEdgeList;
+ }
+
+ /**
+ * Add an edge to this vertex
+ *
+ * @param edge Edge to be added
+ */
+ public void addEdge(Edge<I, E> edge) {
+ addedEdgeList.add(edge);
+ }
+
+ @Override
+ public List<I> getRemovedEdgeList() {
+ return removedEdgeList;
+ }
+
+ /**
+ * Remove an edge on this vertex
+ *
+ * @param destinationVertexId Vertex index of the destination of the edge
+ */
+ public void removeEdge(I destinationVertexId) {
+ removedEdgeList.add(destinationVertexId);
+ }
+
+ @Override
+ public String toString() {
+ JSONObject jsonObject = new JSONObject();
+ try {
+ jsonObject.put("added vertices", getAddedVertexList().toString());
+ jsonObject.put("added edges", getAddedEdgeList().toString());
+ jsonObject.put("removed vertex count", getRemovedVertexCount());
+ jsonObject.put("removed edges", getRemovedEdgeList().toString());
+ return jsonObject.toString();
+ } catch (JSONException e) {
+ throw new IllegalStateException("toString: Got a JSON exception",
+ e);
}
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.Outpu
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
@@ -38,45 +37,45 @@ import org.apache.hadoop.mapreduce.TaskA
*/
@SuppressWarnings("rawtypes")
public abstract class VertexOutputFormat<
- I extends WritableComparable, V extends Writable, E extends Writable> {
- /**
- * Create a vertex writer for a given split. The framework will call
- * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
- * the split is used.
- *
- * @param context the information about the task
- * @return a new vertex writer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract VertexWriter<I, V, E> createVertexWriter(
- TaskAttemptContext context) throws IOException, InterruptedException;
+ I extends WritableComparable, V extends Writable, E extends Writable> {
+ /**
+ * Create a vertex writer for a given split. The framework will call
+ * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+ * the split is used.
+ *
+ * @param context the information about the task
+ * @return a new vertex writer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract VertexWriter<I, V, E> createVertexWriter(
+ TaskAttemptContext context) throws IOException, InterruptedException;
- /**
- * Check for validity of the output-specification for the job.
- * (Copied from Hadoop OutputFormat)
- *
- * <p>This is to validate the output specification for the job when it is
- * a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.</p>
- *
- * @param context information about the job
- * @throws IOException when output should not be attempted
- */
- public abstract void checkOutputSpecs(JobContext context)
- throws IOException, InterruptedException;
+ /**
+ * Check for validity of the output-specification for the job.
+ * (Copied from Hadoop OutputFormat)
+ *
+ * <p>This is to validate the output specification for the job when it is
+ * a job is submitted. Typically checks that it does not already exist,
+ * throwing an exception when it already exists, so that output is not
+ * overwritten.</p>
+ *
+ * @param context information about the job
+ * @throws IOException when output should not be attempted
+ */
+ public abstract void checkOutputSpecs(JobContext context)
+ throws IOException, InterruptedException;
- /**
- * Get the output committer for this output format. This is responsible
- * for ensuring the output is committed correctly.
- * (Copied from Hadoop OutputFormat)
- *
- * @param context the task context
- * @return an output committer
- * @throws IOException
- * @throws InterruptedException
- */
- public abstract OutputCommitter getOutputCommitter(
- TaskAttemptContext context) throws IOException, InterruptedException;
+ /**
+ * Get the output committer for this output format. This is responsible
+ * for ensuring the output is committed correctly.
+ * (Copied from Hadoop OutputFormat)
+ *
+ * @param context the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract OutputCommitter getOutputCommitter(
+ TaskAttemptContext context) throws IOException, InterruptedException;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java Thu Feb 16 22:12:31 2012
@@ -25,54 +25,63 @@ import org.apache.hadoop.mapreduce.TaskA
import java.io.IOException;
+/**
+ * Analogous to {@link RecordReader} for vertices. Will read the vertices
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
@SuppressWarnings("rawtypes")
-public interface VertexReader<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> {
- /**
- * Use the input split and context to setup reading the vertices.
- * Guaranteed to be called prior to any other function.
- *
- * @param inputSplit
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException;
-
- /**
- *
- * @return false iff there are no more vertices
- * @throws IOException
- * @throws InterruptedException
- */
- boolean nextVertex() throws IOException, InterruptedException;
-
- /**
- *
- * @return the current vertex which has been read. nextVertex() should be called first.
- * @throws IOException
- * @throws InterruptedException
- */
- BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException;
-
- /**
- * Close this {@link VertexReader} to future operations.
- *
- * @throws IOException
- */
- void close() throws IOException;
-
- /**
- * How much of the input has the {@link VertexReader} consumed i.e.
- * has been processed by?
- *
- * @return Progress from <code>0.0</code> to <code>1.0</code>.
- * @throws IOException
- * @throws InterruptedException
- */
- float getProgress() throws IOException, InterruptedException;
+public interface VertexReader<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Use the input split and context to setup reading the vertices.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param inputSplit Input split to be used for reading vertices.
+ * @param context Context from the task.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException;
+
+ /**
+ *
+ * @return false iff there are no more vertices
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean nextVertex() throws IOException, InterruptedException;
+
+ /**
+ * Get the current vertex.
+ *
+ * @return the current vertex which has been read.
+ * nextVertex() should be called first.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ BasicVertex<I, V, E, M> getCurrentVertex()
+ throws IOException, InterruptedException;
+
+ /**
+ * Close this {@link VertexReader} to future operations.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /**
+ * How much of the input has the {@link VertexReader} consumed i.e.
+ * has been processed by?
+ *
+ * @return Progress from <code>0.0</code> to <code>1.0</code>.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ float getProgress() throws IOException, InterruptedException;
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Thu Feb 16 22:12:31 2012
@@ -31,110 +31,113 @@ import java.util.List;
* Default implementation of how to resolve vertex creation/removal, messages
* to nonexistent vertices, etc.
*
- * @param <I>
- * @param <V>
- * @param <E>
- * @param <M>
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class VertexResolver<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- implements BasicVertexResolver<I, V, E, M>, Configurable {
- /** Configuration */
- private Configuration conf = null;
-
- private GraphState<I,V,E,M> graphState;
-
- /** Class logger */
- private static final Logger LOG = Logger.getLogger(VertexResolver.class);
-
- @Override
- public BasicVertex<I, V, E, M> resolve(
- I vertexId,
- BasicVertex<I, V, E, M> vertex,
- VertexChanges<I, V, E, M> vertexChanges,
- Iterable<M> messages) {
- // Default algorithm:
- // 1. If the vertex exists, first prune the edges
- // 2. If vertex removal desired, remove the vertex.
- // 3. If creation of vertex desired, pick first vertex
- // 4. If vertex doesn't exist, but got messages, create
- // 5. If edge addition, add the edges
- if (vertex != null) {
- if (vertexChanges != null) {
- List<I> removedEdgeList = vertexChanges.getRemovedEdgeList();
- for (I removedDestVertex : removedEdgeList) {
- E removeEdge =
- ((MutableVertex<I, V, E, M>) vertex).removeEdge(
- removedDestVertex);
- if (removeEdge == null) {
- LOG.warn("resolve: Failed to remove edge with " +
- "destination " + removedDestVertex + "on " +
- vertex + " since it doesn't exist.");
- }
- }
- if (vertexChanges.getRemovedVertexCount() > 0) {
- vertex = null;
- }
- }
+ E extends Writable, M extends Writable>
+ implements BasicVertexResolver<I, V, E, M>, Configurable {
+ /** Class logger */
+ private static final Logger LOG = Logger.getLogger(VertexResolver.class);
+ /** Configuration */
+ private Configuration conf = null;
+ /** Stored graph state */
+ private GraphState<I, V, E, M> graphState;
+
+ @Override
+ public BasicVertex<I, V, E, M> resolve(
+ I vertexId,
+ BasicVertex<I, V, E, M> vertex,
+ VertexChanges<I, V, E, M> vertexChanges,
+ Iterable<M> messages) {
+ // Default algorithm:
+ // 1. If the vertex exists, first prune the edges
+ // 2. If vertex removal desired, remove the vertex.
+ // 3. If creation of vertex desired, pick first vertex
+ // 4. If vertex doesn't exist, but got messages, create
+ // 5. If edge addition, add the edges
+ if (vertex != null) {
+ if (vertexChanges != null) {
+ List<I> removedEdgeList = vertexChanges.getRemovedEdgeList();
+ for (I removedDestVertex : removedEdgeList) {
+ E removeEdge =
+ ((MutableVertex<I, V, E, M>) vertex).removeEdge(
+ removedDestVertex);
+ if (removeEdge == null) {
+ LOG.warn("resolve: Failed to remove edge with " +
+ "destination " + removedDestVertex + "on " +
+ vertex + " since it doesn't exist.");
+ }
}
-
- if (vertex == null) {
- if (vertexChanges != null) {
- if (!vertexChanges.getAddedVertexList().isEmpty()) {
- vertex = vertexChanges.getAddedVertexList().get(0);
- }
- }
- if (vertex == null && messages != null
- && !Iterables.isEmpty(messages)) {
- vertex = instantiateVertex();
- vertex.initialize(vertexId,
- BspUtils.<V>createVertexValue(getConf()),
- null,
- messages);
- }
- } else {
- if ((vertexChanges != null) &&
- (!vertexChanges.getAddedVertexList().isEmpty())) {
- LOG.warn("resolve: Tried to add a vertex with id = " +
- vertex.getVertexId() + " when one already " +
- "exists. Ignoring the add vertex request.");
- }
- }
-
- if (vertexChanges != null &&
- !vertexChanges.getAddedEdgeList().isEmpty()) {
- MutableVertex<I, V, E, M> mutableVertex =
- (MutableVertex<I, V, E, M>) vertex;
- for (Edge<I, E> edge : vertexChanges.getAddedEdgeList()) {
- edge.setConf(getConf());
- mutableVertex.addEdge(edge.getDestVertexId(),
- edge.getEdgeValue());
- }
+ if (vertexChanges.getRemovedVertexCount() > 0) {
+ vertex = null;
}
-
- return vertex;
+ }
}
- @Override
- public BasicVertex<I, V, E, M> instantiateVertex() {
- BasicVertex<I, V, E, M> vertex =
- BspUtils.<I, V, E, M>createVertex(getConf());
- vertex.setGraphState(graphState);
- return vertex;
+ if (vertex == null) {
+ if (vertexChanges != null) {
+ if (!vertexChanges.getAddedVertexList().isEmpty()) {
+ vertex = vertexChanges.getAddedVertexList().get(0);
+ }
+ }
+ if (vertex == null && messages != null && !Iterables.isEmpty(messages)) {
+ vertex = instantiateVertex();
+ vertex.initialize(vertexId,
+ BspUtils.<V>createVertexValue(getConf()),
+ null,
+ messages);
+ }
+ } else {
+ if ((vertexChanges != null) &&
+ (!vertexChanges.getAddedVertexList().isEmpty())) {
+ LOG.warn("resolve: Tried to add a vertex with id = " +
+ vertex.getVertexId() + " when one already " +
+ "exists. Ignoring the add vertex request.");
+ }
}
- @Override
- public Configuration getConf() {
- return conf;
+ if (vertexChanges != null &&
+ !vertexChanges.getAddedEdgeList().isEmpty()) {
+ MutableVertex<I, V, E, M> mutableVertex =
+ (MutableVertex<I, V, E, M>) vertex;
+ for (Edge<I, E> edge : vertexChanges.getAddedEdgeList()) {
+ edge.setConf(getConf());
+ mutableVertex.addEdge(edge.getDestVertexId(),
+ edge.getEdgeValue());
+ }
}
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ return vertex;
+ }
- public void setGraphState(GraphState<I, V, E, M> graphState) {
- this.graphState = graphState;
- }
+ @Override
+ public BasicVertex<I, V, E, M> instantiateVertex() {
+ BasicVertex<I, V, E, M> vertex =
+ BspUtils.<I, V, E, M>createVertex(getConf());
+ vertex.setGraphState(graphState);
+ return vertex;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Set the graph state.
+ *
+ * @param graphState Graph state saved.
+ */
+ public void setGraphState(GraphState<I, V, E, M> graphState) {
+ this.graphState = graphState;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java Thu Feb 16 22:12:31 2012
@@ -32,36 +32,34 @@ import org.apache.hadoop.mapreduce.TaskA
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
-public interface VertexWriter<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable> {
- /**
- * Use the context to setup writing the vertices.
- * Guaranteed to be called prior to any other function.
- *
- * @param context
- * @throws IOException
- */
- void initialize(TaskAttemptContext context) throws IOException;
+public interface VertexWriter<I extends WritableComparable, V extends Writable,
+ E extends Writable> {
+ /**
+ * Use the context to setup writing the vertices.
+ * Guaranteed to be called prior to any other function.
+ *
+ * @param context Context used to write the vertices.
+ * @throws IOException
+ */
+ void initialize(TaskAttemptContext context) throws IOException;
- /**
- * Writes the next vertex and associated data
- *
- * @param vertex set the properties of this vertex
- * @throws IOException
- * @throws InterruptedException
- */
- void writeVertex(BasicVertex<I, V, E, ?> vertex)
- throws IOException, InterruptedException;
+ /**
+ * Writes the next vertex and associated data
+ *
+ * @param vertex set the properties of this vertex
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void writeVertex(BasicVertex<I, V, E, ?> vertex)
+ throws IOException, InterruptedException;
- /**
- * Close this {@link VertexWriter} to future operations.
- *
- * @param context the context of the task
- * @throws IOException
- * @throws InterruptedException
- */
- void close(TaskAttemptContext context)
- throws IOException, InterruptedException;
+ /**
+ * Close this {@link VertexWriter} to future operations.
+ *
+ * @param context the context of the task
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void close(TaskAttemptContext context)
+ throws IOException, InterruptedException;
}