You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [13/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/example...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Feb 16 22:12:31 2012
@@ -34,279 +34,304 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;
 
+/**
+ * Optimized vertex implementation for
+ * <LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ */
 public abstract class LongDoubleFloatDoubleVertex extends
-        MutableVertex<LongWritable, DoubleWritable, FloatWritable,
-        DoubleWritable> {
-    /** Class logger */
-    private static final Logger LOG =
-        Logger.getLogger(LongDoubleFloatDoubleVertex.class);
-
-    private long vertexId;
-    private double vertexValue;
-    private OpenLongFloatHashMap verticesWithEdgeValues =
-        new OpenLongFloatHashMap();
-    private DoubleArrayList messageList = new DoubleArrayList();
-
-    @Override
-    public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
-                           Map<LongWritable, FloatWritable> edgesW,
-                           Iterable<DoubleWritable> messagesW) {
-        if (vertexIdW != null ) {
-            vertexId = vertexIdW.get();
-        }
-        if (vertexValueW != null) {
-            vertexValue = vertexValueW.get();
-        }
-        if (edgesW != null) {
-            for (Map.Entry<LongWritable, FloatWritable> entry :
-                    edgesW.entrySet()) {
-                verticesWithEdgeValues.put(entry.getKey().get(),
-                                           entry.getValue().get());
-            }
-        }
-        if (messagesW != null) {
-            for(DoubleWritable m : messagesW) {
-                messageList.add(m.get());
-            }
-        }
-    }
-
-    @Override
-    public final boolean addEdge(LongWritable targetId,
-                                 FloatWritable edgeValue) {
-        if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("addEdge: Vertex=" + vertexId +
-                        ": already added an edge value for dest vertex id " +
-                        targetId.get());
-            }
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    @Override
-    public FloatWritable removeEdge(LongWritable targetVertexId) {
-        long target = targetVertexId.get();
-        if (verticesWithEdgeValues.containsKey(target)) {
-            float value = verticesWithEdgeValues.get(target);
-            verticesWithEdgeValues.removeKey(target);
-            return new FloatWritable(value);
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public final void setVertexId(LongWritable vertexId) {
-        this.vertexId = vertexId.get();
-    }
-
-    @Override
-    public final LongWritable getVertexId() {
-        // TODO: possibly not make new objects every time?
-        return new LongWritable(vertexId);
-    }
-
-    @Override
-    public final DoubleWritable getVertexValue() {
-        return new DoubleWritable(vertexValue);
-    }
-
-    @Override
-    public final void setVertexValue(DoubleWritable vertexValue) {
-        this.vertexValue = vertexValue.get();
-    }
-
-    @Override
-    public final void sendMsg(LongWritable id, DoubleWritable msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException(
-                    "sendMsg: Cannot send null message to " + id);
-        }
-        getGraphState().getWorkerCommunications().sendMessageReq(id, msg);
-    }
-
-    @Override
-    public final void sendMsgToAllEdges(final DoubleWritable msg) {
-        if (msg == null) {
-            throw new IllegalArgumentException(
-                "sendMsgToAllEdges: Cannot send null message to all edges");
-        }
-        final MutableVertex<LongWritable, DoubleWritable, FloatWritable,
-            DoubleWritable> vertex = this;
-        verticesWithEdgeValues.forEachKey(new LongProcedure() {
-            @Override
-            public boolean apply(long destVertexId) {
-                vertex.sendMsg(new LongWritable(destVertexId), msg);
-                return true;
-            }
-        });
-    }
-
-    @Override
-    public long getNumVertices() {
-        return getGraphState().getNumVertices();
-    }
-
-    @Override
-    public long getNumEdges() {
-        return getGraphState().getNumEdges();
-    }
-
-    @Override
-    public Iterator<LongWritable> iterator() {
-        final long[] destVertices = verticesWithEdgeValues.keys().elements();
-        final int destVerticesSize = verticesWithEdgeValues.size();
-        return new Iterator<LongWritable>() {
-            int offset = 0;
-            @Override public boolean hasNext() {
-                return offset < destVerticesSize;
-            }
-
-            @Override public LongWritable next() {
-                return new LongWritable(destVertices[offset++]);
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException(
-                    "Mutation disallowed for edge list via iterator");
-            }
-        };
-    }
-
-    @Override
-    public FloatWritable getEdgeValue(LongWritable targetVertexId) {
-        return new FloatWritable(
-            verticesWithEdgeValues.get(targetVertexId.get()));
-    }
-
-    @Override
-    public boolean hasEdge(LongWritable targetVertexId) {
-        return verticesWithEdgeValues.containsKey(targetVertexId.get());
-    }
-
-    @Override
-    public int getNumOutEdges() {
-        return verticesWithEdgeValues.size();
-    }
-
-    @Override
-    public long getSuperstep() {
-        return getGraphState().getSuperstep();
-    }
-
-    @Override
-    final public void readFields(DataInput in) throws IOException {
-        vertexId = in.readLong();
-        vertexValue = in.readDouble();
-        long edgeMapSize = in.readLong();
-        for (long i = 0; i < edgeMapSize; ++i) {
-            long destVertexId = in.readLong();
-            float edgeValue = in.readFloat();
-            verticesWithEdgeValues.put(destVertexId, edgeValue);
-        }
-        long msgListSize = in.readLong();
-        for (long i = 0; i < msgListSize; ++i) {
-            messageList.add(in.readDouble());
-        }
-        halt = in.readBoolean();
-    }
-
-    @Override
-    public final void write(final DataOutput out) throws IOException {
-        out.writeLong(vertexId);
-        out.writeDouble(vertexValue);
-        out.writeLong(verticesWithEdgeValues.size());
-        verticesWithEdgeValues.forEachPair(new LongFloatProcedure() {
-            @Override
-            public boolean apply(long destVertexId, float edgeValue) {
-                try {
-                    out.writeLong(destVertexId);
-                    out.writeFloat(edgeValue);
-                } catch (IOException e) {
-                    throw new IllegalStateException(
-                        "apply: IOException when not allowed", e);
-                }
-                return true;
-            }
-        });
-        out.writeLong(messageList.size());
-        messageList.forEach(new DoubleProcedure() {
-             @Override
-             public boolean apply(double message) {
-                 try {
-                     out.writeDouble(message);
-                 } catch (IOException e) {
-                     throw new IllegalStateException(
-                         "apply: IOException when not allowed", e);
-                 }
-                 return true;
-             }
-        });
-        out.writeBoolean(halt);
-    }
-
-    @Override
-    void putMessages(Iterable<DoubleWritable> messages) {
-        messageList.clear();
-        for (DoubleWritable message : messages) {
-            messageList.add(message.get());
-        }
-    }
-
-    @Override
-    void releaseResources() {
-        // Hint to GC to free the messages
-        messageList.clear();
+    MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> {
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(LongDoubleFloatDoubleVertex.class);
+  /** Long vertex id */
+  private long vertexId;
+  /** Double vertex value */
+  private double vertexValue;
+  /** Stores the edges */
+  private OpenLongFloatHashMap verticesWithEdgeValues =
+      new OpenLongFloatHashMap();
+  /** Message list storage */
+  private DoubleArrayList messageList = new DoubleArrayList();
+
+  @Override
+  public void initialize(LongWritable vertexIdW, DoubleWritable vertexValueW,
+      Map<LongWritable, FloatWritable> edgesW,
+      Iterable<DoubleWritable> messagesW) {
+    if (vertexIdW != null) {
+      vertexId = vertexIdW.get();
+    }
+    if (vertexValueW != null) {
+      vertexValue = vertexValueW.get();
+    }
+    if (edgesW != null) {
+      for (Map.Entry<LongWritable, FloatWritable> entry :
+        edgesW.entrySet()) {
+        verticesWithEdgeValues.put(entry.getKey().get(),
+            entry.getValue().get());
+      }
+    }
+    if (messagesW != null) {
+      for (DoubleWritable m : messagesW) {
+        messageList.add(m.get());
+      }
+    }
+  }
+
+  @Override
+  public final boolean addEdge(LongWritable targetId,
+      FloatWritable edgeValue) {
+    if (verticesWithEdgeValues.put(targetId.get(), edgeValue.get())) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("addEdge: Vertex=" + vertexId +
+            ": already added an edge value for dest vertex id " +
+            targetId.get());
+      }
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public FloatWritable removeEdge(LongWritable targetVertexId) {
+    long target = targetVertexId.get();
+    if (verticesWithEdgeValues.containsKey(target)) {
+      float value = verticesWithEdgeValues.get(target);
+      verticesWithEdgeValues.removeKey(target);
+      return new FloatWritable(value);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public final void setVertexId(LongWritable vertexId) {
+    this.vertexId = vertexId.get();
+  }
+
+  @Override
+  public final LongWritable getVertexId() {
+    // TODO: possibly not make new objects every time?
+    return new LongWritable(vertexId);
+  }
+
+  @Override
+  public final DoubleWritable getVertexValue() {
+    return new DoubleWritable(vertexValue);
+  }
+
+  @Override
+  public final void setVertexValue(DoubleWritable vertexValue) {
+    this.vertexValue = vertexValue.get();
+  }
+
+  @Override
+  public final void sendMsg(LongWritable id, DoubleWritable msg) {
+    if (msg == null) {
+      throw new IllegalArgumentException(
+          "sendMsg: Cannot send null message to " + id);
+    }
+    getGraphState().getWorkerCommunications().sendMessageReq(id, msg);
+  }
+
+  @Override
+  public final void sendMsgToAllEdges(final DoubleWritable msg) {
+    if (msg == null) {
+      throw new IllegalArgumentException(
+          "sendMsgToAllEdges: Cannot send null message to all edges");
+    }
+    final MutableVertex<LongWritable, DoubleWritable, FloatWritable,
+    DoubleWritable> vertex = this;
+    verticesWithEdgeValues.forEachKey(new LongProcedure() {
+      @Override
+      public boolean apply(long destVertexId) {
+        vertex.sendMsg(new LongWritable(destVertexId), msg);
+        return true;
+      }
+    });
+  }
+
+  @Override
+  public long getNumVertices() {
+    return getGraphState().getNumVertices();
+  }
+
+  @Override
+  public long getNumEdges() {
+    return getGraphState().getNumEdges();
+  }
+
+  @Override
+  public Iterator<LongWritable> iterator() {
+    final long[] destVertices = verticesWithEdgeValues.keys().elements();
+    final int destVerticesSize = verticesWithEdgeValues.size();
+    return new Iterator<LongWritable>() {
+      private int offset = 0;
+      @Override public boolean hasNext() {
+        return offset < destVerticesSize;
+      }
+
+      @Override public LongWritable next() {
+        return new LongWritable(destVertices[offset++]);
+      }
+
+      @Override public void remove() {
+        throw new UnsupportedOperationException(
+            "Mutation disallowed for edge list via iterator");
+      }
+    };
+  }
+
+  @Override
+  public FloatWritable getEdgeValue(LongWritable targetVertexId) {
+    return new FloatWritable(
+        verticesWithEdgeValues.get(targetVertexId.get()));
+  }
+
+  @Override
+  public boolean hasEdge(LongWritable targetVertexId) {
+    return verticesWithEdgeValues.containsKey(targetVertexId.get());
+  }
+
+  @Override
+  public int getNumOutEdges() {
+    return verticesWithEdgeValues.size();
+  }
+
+  @Override
+  public long getSuperstep() {
+    return getGraphState().getSuperstep();
+  }
+
+  @Override
+  public final void readFields(DataInput in) throws IOException {
+    vertexId = in.readLong();
+    vertexValue = in.readDouble();
+    long edgeMapSize = in.readLong();
+    for (long i = 0; i < edgeMapSize; ++i) {
+      long destVertexId = in.readLong();
+      float edgeValue = in.readFloat();
+      verticesWithEdgeValues.put(destVertexId, edgeValue);
+    }
+    long msgListSize = in.readLong();
+    for (long i = 0; i < msgListSize; ++i) {
+      messageList.add(in.readDouble());
+    }
+    halt = in.readBoolean();
+  }
+
+  @Override
+  public final void write(final DataOutput out) throws IOException {
+    out.writeLong(vertexId);
+    out.writeDouble(vertexValue);
+    out.writeLong(verticesWithEdgeValues.size());
+    verticesWithEdgeValues.forEachPair(new LongFloatProcedure() {
+      @Override
+      public boolean apply(long destVertexId, float edgeValue) {
+        try {
+          out.writeLong(destVertexId);
+          out.writeFloat(edgeValue);
+        } catch (IOException e) {
+          throw new IllegalStateException(
+              "apply: IOException when not allowed", e);
+        }
+        return true;
+      }
+    });
+    out.writeLong(messageList.size());
+    messageList.forEach(new DoubleProcedure() {
+      @Override
+      public boolean apply(double message) {
+        try {
+          out.writeDouble(message);
+        } catch (IOException e) {
+          throw new IllegalStateException(
+              "apply: IOException when not allowed", e);
+        }
+        return true;
+      }
+    });
+    out.writeBoolean(halt);
+  }
+
+  @Override
+  void putMessages(Iterable<DoubleWritable> messages) {
+    messageList.clear();
+    for (DoubleWritable message : messages) {
+      messageList.add(message.get());
+    }
+  }
+
+  @Override
+  void releaseResources() {
+    // Hint to GC to free the messages
+    messageList.clear();
+  }
+
+  @Override
+  public Iterable<DoubleWritable> getMessages() {
+    return new UnmodifiableDoubleWritableIterable(messageList);
+  }
+
+  @Override
+  public String toString() {
+    return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
+        ",#edges=" + getNumOutEdges() + ")";
+  }
+
+  /**
+   * Helper iterable over the messages.
+   */
+  private class UnmodifiableDoubleWritableIterable
+    implements Iterable<DoubleWritable> {
+    /** Backing store of messages */
+    private final DoubleArrayList elementList;
+
+    /**
+     * Constructor.
+     *
+     * @param elementList Backing store of element list.
+     */
+    public UnmodifiableDoubleWritableIterable(
+        DoubleArrayList elementList) {
+      this.elementList = elementList;
+    }
+
+    @Override
+    public Iterator<DoubleWritable> iterator() {
+      return new UnmodifiableDoubleWritableIterator(
+          elementList);
+    }
+  }
+
+  /**
+   * Iterator over the messages.
+   */
+  private class UnmodifiableDoubleWritableIterator
+      extends UnmodifiableIterator<DoubleWritable> {
+    /** Double backing list */
+    private final DoubleArrayList elementList;
+    /** Offset into the backing list */
+    private int offset = 0;
+
+    /**
+     * Constructor.
+     *
+     * @param elementList Backing store of element list.
+     */
+    UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
+      this.elementList = elementList;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return offset < elementList.size();
     }
 
     @Override
-    public Iterable<DoubleWritable> getMessages() {
-        return new UnmodifiableDoubleWritableIterable(messageList);
-    }
-
-    @Override
-    public String toString() {
-        return "Vertex(id=" + getVertexId() + ",value=" + getVertexValue() +
-                ",#edges=" + getNumOutEdges() + ")";
-    }
-
-    private class UnmodifiableDoubleWritableIterable
-            implements Iterable<DoubleWritable> {
-
-        private final DoubleArrayList elementList;
-
-        public UnmodifiableDoubleWritableIterable(
-                DoubleArrayList elementList) {
-            this.elementList = elementList;
-        }
-
-        @Override
-        public Iterator<DoubleWritable> iterator() {
-            return new UnmodifiableDoubleWritableIterator(
-                    elementList);
-        }
-    }
-
-    private class UnmodifiableDoubleWritableIterator
-            extends UnmodifiableIterator<DoubleWritable> {
-        private final DoubleArrayList elementList;
-        private int offset = 0;
-
-        UnmodifiableDoubleWritableIterator(DoubleArrayList elementList) {
-            this.elementList = elementList;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return offset < elementList.size();
-        }
-
-        @Override
-        public DoubleWritable next() {
-            return new DoubleWritable(elementList.get(offset++));
-        }
+    public DoubleWritable next() {
+      return new DoubleWritable(elementList.get(offset++));
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MasterThread.java Thu Feb 16 22:12:31 2012
@@ -18,6 +18,7 @@
 
 package org.apache.giraph.graph;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
@@ -29,152 +30,166 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Master thread that will coordinate the activities of the tasks.  It runs
  * on all task processes, however, will only execute its algorithm if it knows
  * it is the "leader" from ZooKeeper.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class MasterThread<I extends WritableComparable,
-                          V extends Writable,
-                          E extends Writable,
-                          M extends Writable> extends Thread {
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(MasterThread.class);
-    /** Reference to shared BspService */
-    private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
-    /** Context (for counters) */
-    private final Context context;
-    /** Use superstep counters? */
-    private final boolean superstepCounterOn;
-    /** Setup seconds */
-    private double setupSecs = 0d;
-    /** Superstep timer (in seconds) map */
-    private final Map<Long, Double> superstepSecsMap =
-        new TreeMap<Long, Double>();
-
-    /** Counter group name for the Giraph timers */
-    public String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
-
-    /**
-     *  Constructor.
-     *
-     *  @param bspServiceMaster Master that already exists and setup() has
-     *         been called.
-     */
-    MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
-                 Context context) {
-        super(MasterThread.class.getName());
-        this.bspServiceMaster = bspServiceMaster;
-        this.context = context;
-        superstepCounterOn = context.getConfiguration().getBoolean(
-            GiraphJob.USE_SUPERSTEP_COUNTERS,
-            GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
-    }
+public class MasterThread<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> extends Thread {
+  /** Counter group name for the Giraph timers */
+  public static final String GIRAPH_TIMERS_COUNTER_GROUP_NAME = "Giraph Timers";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(MasterThread.class);
+  /** Reference to shared BspService */
+  private CentralizedServiceMaster<I, V, E, M> bspServiceMaster = null;
+  /** Context (for counters) */
+  private final Context context;
+  /** Use superstep counters? */
+  private final boolean superstepCounterOn;
+  /** Setup seconds */
+  private double setupSecs = 0d;
+  /** Superstep timer (in seconds) map */
+  private final Map<Long, Double> superstepSecsMap =
+      new TreeMap<Long, Double>();
+
+  /**
+   * Constructor.
+   *
+   * @param bspServiceMaster Master that already exists and setup() has
+   *        been called.
+   * @param context Context from the Mapper.
+   */
+  MasterThread(BspServiceMaster<I, V, E, M> bspServiceMaster,
+      Context context) {
+    super(MasterThread.class.getName());
+    this.bspServiceMaster = bspServiceMaster;
+    this.context = context;
+    superstepCounterOn = context.getConfiguration().getBoolean(
+        GiraphJob.USE_SUPERSTEP_COUNTERS,
+        GiraphJob.USE_SUPERSTEP_COUNTERS_DEFAULT);
+  }
+
+  /**
+   * The master algorithm.  The algorithm should be able to withstand
+   * failures and resume as necessary since the master may switch during a
+   * job.
+   */
+  @Override
+  public void run() {
+    // Algorithm:
+    // 1. Become the master
+    // 2. If desired, restart from a manual checkpoint
+    // 3. Run all supersteps until complete
+    try {
+      long startMillis = System.currentTimeMillis();
+      long endMillis = 0;
+      bspServiceMaster.setup();
+      if (bspServiceMaster.becomeMaster()) {
+        // Attempt to create InputSplits if necessary. Bail out if that fails.
+        if (bspServiceMaster.getRestartedSuperstep() !=
+            BspService.UNSET_SUPERSTEP ||
+            bspServiceMaster.createInputSplits() != -1) {
+          long setupMillis = System.currentTimeMillis() - startMillis;
+          context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+              "Setup (milliseconds)").
+              increment(setupMillis);
+          setupSecs = setupMillis / 1000.0d;
+          SuperstepState superstepState = SuperstepState.INITIAL;
+          long cachedSuperstep = BspService.UNSET_SUPERSTEP;
+          while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
+            long startSuperstepMillis = System.currentTimeMillis();
+            cachedSuperstep = bspServiceMaster.getSuperstep();
+            superstepState = bspServiceMaster.coordinateSuperstep();
+            long superstepMillis = System.currentTimeMillis() -
+                startSuperstepMillis;
+            superstepSecsMap.put(new Long(cachedSuperstep),
+                superstepMillis / 1000.0d);
+            if (LOG.isInfoEnabled()) {
+              LOG.info("masterThread: Coordination of superstep " +
+                  cachedSuperstep + " took " +
+                  superstepMillis / 1000.0d +
+                  " seconds ended with state " + superstepState +
+                  " and is now on superstep " +
+                  bspServiceMaster.getSuperstep());
+            }
+            if (superstepCounterOn) {
+              String counterPrefix;
+              if (cachedSuperstep == -1) {
+                counterPrefix = "Vertex input superstep";
+              } else {
+                counterPrefix = "Superstep " + cachedSuperstep;
+              }
+              context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+                  counterPrefix +
+                  " (milliseconds)").
+                  increment(superstepMillis);
+            }
 
-    /**
-     * The master algorithm.  The algorithm should be able to withstand
-     * failures and resume as necessary since the master may switch during a
-     * job.
-     */
-    @Override
-    public void run() {
-        // Algorithm:
-        // 1. Become the master
-        // 2. If desired, restart from a manual checkpoint
-        // 3. Run all supersteps until complete
-        try {
-            long startMillis = System.currentTimeMillis();
-            long endMillis = 0;
-            bspServiceMaster.setup();
-            if (bspServiceMaster.becomeMaster() == true) {
-                // Attempt to create InputSplits if necessary. Bail out if that fails.
-                if (bspServiceMaster.getRestartedSuperstep() != BspService.UNSET_SUPERSTEP
-                        || bspServiceMaster.createInputSplits() != -1) {
-                    long setupMillis = (System.currentTimeMillis() - startMillis);
-                    context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                            "Setup (milliseconds)").
-                            increment(setupMillis);
-                    setupSecs = setupMillis / 1000.0d;
-                    SuperstepState superstepState = SuperstepState.INITIAL;
-                    long cachedSuperstep = BspService.UNSET_SUPERSTEP;
-                    while (superstepState != SuperstepState.ALL_SUPERSTEPS_DONE) {
-                        long startSuperstepMillis = System.currentTimeMillis();
-                        cachedSuperstep = bspServiceMaster.getSuperstep();
-                        superstepState = bspServiceMaster.coordinateSuperstep();
-                        long superstepMillis = System.currentTimeMillis() -
-                                startSuperstepMillis;
-                        superstepSecsMap.put(new Long(cachedSuperstep),
-                                superstepMillis / 1000.0d);
-                        if (LOG.isInfoEnabled()) {
-                            LOG.info("masterThread: Coordination of superstep " +
-                                    cachedSuperstep + " took " +
-                                    superstepMillis / 1000.0d +
-                                    " seconds ended with state " + superstepState +
-                                    " and is now on superstep " +
-                                    bspServiceMaster.getSuperstep());
-                        }
-                        if (superstepCounterOn) {
-                            String counterPrefix;
-                            if (cachedSuperstep == -1) {
-                                counterPrefix = "Vertex input superstep";
-                            } else {
-                                counterPrefix = "Superstep " + cachedSuperstep;
-                            }
-                            context.getCounter(GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                                    counterPrefix +
-                                    " (milliseconds)").
-                                    increment(superstepMillis);
-                        }
-
-                        // If a worker failed, restart from a known good superstep
-                        if (superstepState == SuperstepState.WORKER_FAILURE) {
-                            bspServiceMaster.restartFromCheckpoint(
-                                    bspServiceMaster.getLastGoodCheckpoint());
-                        }
-                        endMillis = System.currentTimeMillis();
-                    }
-                    bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
-                }
+            // If a worker failed, restart from a known good superstep
+            if (superstepState == SuperstepState.WORKER_FAILURE) {
+              bspServiceMaster.restartFromCheckpoint(
+                  bspServiceMaster.getLastGoodCheckpoint());
             }
-            bspServiceMaster.cleanup();
-            if (!superstepSecsMap.isEmpty()) {
-                context.getCounter(
-                        GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                        "Shutdown (milliseconds)").
-                        increment(System.currentTimeMillis() - endMillis);
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("setup: Took " + setupSecs + " seconds.");
-                }
-                for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
-                    if (LOG.isInfoEnabled()) {
-                        if (entry.getKey().longValue() ==
-                                BspService.INPUT_SUPERSTEP) {
-                            LOG.info("vertex input superstep: Took " +
-                                     entry.getValue() + " seconds.");
-                        } else {
-                            LOG.info("superstep " + entry.getKey() + ": Took " +
-                                     entry.getValue() + " seconds.");
-                        }
-                    }
-                }
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("shutdown: Took " +
-                             (System.currentTimeMillis() - endMillis) /
-                             1000.0d + " seconds.");
-                    LOG.info("total: Took " +
-                             ((System.currentTimeMillis() / 1000.0d) -
-                             setupSecs) + " seconds.");
-                }
-                context.getCounter(
-                    GIRAPH_TIMERS_COUNTER_GROUP_NAME,
-                    "Total (milliseconds)").
-                    increment(System.currentTimeMillis() - startMillis);
+            endMillis = System.currentTimeMillis();
+          }
+          bspServiceMaster.setJobState(ApplicationState.FINISHED, -1, -1);
+        }
+      }
+      bspServiceMaster.cleanup();
+      if (!superstepSecsMap.isEmpty()) {
+        context.getCounter(
+            GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+            "Shutdown (milliseconds)").
+            increment(System.currentTimeMillis() - endMillis);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("setup: Took " + setupSecs + " seconds.");
+        }
+        for (Entry<Long, Double> entry : superstepSecsMap.entrySet()) {
+          if (LOG.isInfoEnabled()) {
+            if (entry.getKey().longValue() ==
+                BspService.INPUT_SUPERSTEP) {
+              LOG.info("vertex input superstep: Took " +
+                  entry.getValue() + " seconds.");
+            } else {
+              LOG.info("superstep " + entry.getKey() + ": Took " +
+                  entry.getValue() + " seconds.");
             }
-        } catch (Exception e) {
-            LOG.error("masterThread: Master algorithm failed: ", e);
-            throw new RuntimeException(e);
+          }
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("shutdown: Took " +
+              (System.currentTimeMillis() - endMillis) /
+              1000.0d + " seconds.");
+          LOG.info("total: Took " +
+              ((System.currentTimeMillis() / 1000.0d) -
+                  setupSecs) + " seconds.");
         }
+        context.getCounter(
+            GIRAPH_TIMERS_COUNTER_GROUP_NAME,
+            "Total (milliseconds)").
+            increment(System.currentTimeMillis() - startMillis);
+      }
+    } catch (IOException e) {
+      LOG.error("masterThread: Master algorithm failed with " +
+          "IOException ", e);
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      LOG.error("masterThread: Master algorithm failed with " +
+          "InterruptedException", e);
+      throw new IllegalStateException(e);
+    } catch (KeeperException e) {
+      LOG.error("masterThread: Master algorithm failed with " +
+          "KeeperException", e);
+      throw new IllegalStateException(e);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Thu Feb 16 22:12:31 2012
@@ -27,98 +27,107 @@ import java.util.Map;
 /**
  * Interface used by VertexReader to set the properties of a new vertex
  * or mutate the graph.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class MutableVertex<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable>
-        extends BasicVertex<I, V, E, M> {
-    /**
-     * Set the vertex id
-     *
-     * @param id Vertex id is set to this (instantiated by the user)
-     */
-    public abstract void setVertexId(I id);
-
-    /**
-     * Add an edge for this vertex (happens immediately)
-     *
-     * @param targetVertexId target vertex
-     * @param edgeValue value of the edge
-     * @return Return true if succeeded, false otherwise
-     */
-    public abstract boolean addEdge(I targetVertexId, E edgeValue);
-
-    /**
-     * Removes an edge for this vertex (happens immediately).
-     *
-     * @param targetVertexId the target vertex id of the edge to be removed.
-     * @return the value of the edge which was removed (or null if no
-     *         edge existed to targetVertexId)
-     */
-    public abstract E removeEdge(I targetVertexId);
-
-    /**
-     * Create a vertex to add to the graph.  Calls initialize() for the vertex
-     * as well.
-     *
-     * @return A new vertex for adding to the graph
-     */
-    public BasicVertex<I, V, E, M> instantiateVertex(
-        I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
-        MutableVertex<I, V, E, M> mutableVertex =
-            (MutableVertex<I, V, E, M>) BspUtils
-               .<I, V, E, M>createVertex(getContext().getConfiguration());
-        mutableVertex.setGraphState(getGraphState());
-        mutableVertex.initialize(vertexId, vertexValue, edges, messages);
-        return mutableVertex;
-    }
-
-    /**
-     * Sends a request to create a vertex that will be available during the
-     * next superstep.  Use instantiateVertex() to do the instantiation.
-     *
-     * @param vertex User created vertex
-     */
-    public void addVertexRequest(BasicVertex<I, V, E, M> vertex)
-            throws IOException {
-        getGraphState().getWorkerCommunications().
-            addVertexReq(vertex);
-    }
-
-    /**
-     * Request to remove a vertex from the graph
-     * (applied just prior to the next superstep).
-     *
-     * @param vertexId Id of the vertex to be removed.
-     */
-    public void removeVertexRequest(I vertexId) throws IOException {
-        getGraphState().getWorkerCommunications().
-        removeVertexReq(vertexId);
-    }
-
-    /**
-     * Request to add an edge of a vertex in the graph
-     * (processed just prior to the next superstep)
-     *
-     * @param sourceVertexId Source vertex id of edge
-     * @param edge Edge to add
-     */
-    public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
-            throws IOException {
-        getGraphState().getWorkerCommunications().
-            addEdgeReq(sourceVertexId, edge);
-    }
-
-    /**
-     * Request to remove an edge of a vertex from the graph
-     * (processed just prior to the next superstep).
-     *
-     * @param sourceVertexId Source vertex id of edge
-     * @param destVertexId Destination vertex id of edge
-     */
-    public void removeEdgeRequest(I sourceVertexId, I destVertexId)
-            throws IOException {
-        getGraphState().getWorkerCommunications().
-            removeEdgeReq(sourceVertexId, destVertexId);
-    }
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BasicVertex<I, V, E, M> {
+  /**
+   * Set the vertex id
+   *
+   * @param id Vertex id is set to this (instantiated by the user)
+   */
+  public abstract void setVertexId(I id);
+
+  /**
+   * Add an edge for this vertex (happens immediately)
+   *
+   * @param targetVertexId target vertex
+   * @param edgeValue value of the edge
+   * @return Return true if succeeded, false otherwise
+   */
+  public abstract boolean addEdge(I targetVertexId, E edgeValue);
+
+  /**
+   * Removes an edge for this vertex (happens immediately).
+   *
+   * @param targetVertexId the target vertex id of the edge to be removed.
+   * @return the value of the edge which was removed (or null if no
+   *         edge existed to targetVertexId)
+   */
+  public abstract E removeEdge(I targetVertexId);
+
+  /**
+   * Create a vertex to add to the graph.  Calls initialize() for the vertex
+   * as well.
+   *
+   * @param vertexId Id of the new vertex.
+   * @param vertexValue Value of the new vertex.
+   * @param edges Map of edges to be added to this vertex.
+   * @param messages Messages to be added to the vertex (typically empty)
+   * @return A new vertex for adding to the graph
+   */
+  public BasicVertex<I, V, E, M> instantiateVertex(
+      I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
+    MutableVertex<I, V, E, M> mutableVertex =
+        (MutableVertex<I, V, E, M>) BspUtils
+        .<I, V, E, M>createVertex(getContext().getConfiguration());
+    mutableVertex.setGraphState(getGraphState());
+    mutableVertex.initialize(vertexId, vertexValue, edges, messages);
+    return mutableVertex;
+  }
+
+  /**
+   * Sends a request to create a vertex that will be available during the
+   * next superstep.  Use instantiateVertex() to do the instantiation.
+   *
+   * @param vertex User created vertex
+   */
+  public void addVertexRequest(BasicVertex<I, V, E, M> vertex)
+    throws IOException {
+    getGraphState().getWorkerCommunications().
+    addVertexReq(vertex);
+  }
+
+  /**
+   * Request to remove a vertex from the graph
+   * (applied just prior to the next superstep).
+   *
+   * @param vertexId Id of the vertex to be removed.
+   */
+  public void removeVertexRequest(I vertexId) throws IOException {
+    getGraphState().getWorkerCommunications().
+    removeVertexReq(vertexId);
+  }
+
+  /**
+   * Request to add an edge of a vertex in the graph
+   * (processed just prior to the next superstep)
+   *
+   * @param sourceVertexId Source vertex id of edge
+   * @param edge Edge to add
+   */
+  public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
+    throws IOException {
+    getGraphState().getWorkerCommunications().
+    addEdgeReq(sourceVertexId, edge);
+  }
+
+  /**
+   * Request to remove an edge of a vertex from the graph
+   * (processed just prior to the next superstep).
+   *
+   * @param sourceVertexId Source vertex id of edge
+   * @param destVertexId Destination vertex id of edge
+   */
+  public void removeEdgeRequest(I sourceVertexId, I destVertexId)
+    throws IOException {
+    getGraphState().getWorkerCommunications().
+    removeEdgeReq(sourceVertexId, destVertexId);
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/TextAggregatorWriter.java Thu Feb 16 22:12:31 2012
@@ -30,92 +30,98 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 
 /**
- * Default implementation of {@link AggregatorWriter}. Each line consists of 
- * text and contains the aggregator name, the aggregator value and the 
+ * Default implementation of {@link AggregatorWriter}. Each line consists of
+ * text and contains the aggregator name, the aggregator value and the
  * aggregator class.
  */
-public class TextAggregatorWriter 
-        implements AggregatorWriter {
-    /** The filename of the outputfile */
-    public static final String FILENAME = 
-        "giraph.textAggregatorWriter.filename";
-    /** The frequency of writing:
-     *  - NEVER: never write, files aren't created at all
-     *  - AT_THE_END: aggregators are written only when the computation is over
-     *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on 
-     */
-    public static final String FREQUENCY = 
-        "giraph.textAggregatorWriter.frequency";
-    private static final String DEFAULT_FILENAME = "aggregatorValues";
-    /** Signal for "never write" frequency */
-    public static final int NEVER = 0;
-    /** Signal for "write only the final values" frequency */
-    public static final int AT_THE_END = -1;
-    /** Handle to the outputfile */
-    protected FSDataOutputStream output;
-    private int frequency;
-    
-    @Override
-    @SuppressWarnings("rawtypes")
-    public void initialize(Context context, long attempt) throws IOException {
-        Configuration conf = context.getConfiguration();
-        frequency = conf.getInt(FREQUENCY, NEVER);
-        String filename  = conf.get(FILENAME, DEFAULT_FILENAME);
-        if (frequency != NEVER) {
-            Path p = new Path(filename+"_"+attempt);
-            FileSystem fs = FileSystem.get(conf);
-            if (fs.exists(p)) {
-                throw new RuntimeException("aggregatorWriter file already" +
-                    " exists: " + p.getName());
-            }
-            output = fs.create(p);
-        }
-    }
+public class TextAggregatorWriter implements AggregatorWriter {
+  /** The filename of the outputfile */
+  public static final String FILENAME =
+      "giraph.textAggregatorWriter.filename";
+  /** Signal for "never write" frequency */
+  public static final int NEVER = 0;
+  /** Signal for "write only the final values" frequency */
+  public static final int AT_THE_END = -1;
+  /** The frequency of writing:
+   *  - NEVER: never write, files aren't created at all
+   *  - AT_THE_END: aggregators are written only when the computation is over
+   *  - int: i.e. 1 is every superstep, 2 every two supersteps and so on
+   */
+  public static final String FREQUENCY =
+      "giraph.textAggregatorWriter.frequency";
+  /** Default filename for dumping aggregator values */
+  private static final String DEFAULT_FILENAME = "aggregatorValues";
+  /** Handle to the outputfile */
+  protected FSDataOutputStream output;
+  /** Write every "frequency" supersteps */
+  private int frequency;
 
-    @Override
-    final public void writeAggregator(
-            Map<String, Aggregator<Writable>> aggregators,
-            long superstep) throws IOException {
-        
-        if (shouldWrite(superstep)) {
-            for (Entry<String, Aggregator<Writable>> a: 
-                    aggregators.entrySet()) {
-                output.writeUTF(aggregatorToString(a.getKey(), 
-                                                   a.getValue(), 
-                                                   superstep));
-            }
-            output.flush();
-        }
-    }
-    
-    /**
-     * Implements the way an aggregator is converted into a String.
-     * Override this if you want to implement your own text format.
-     * 
-     * @param aggregatorName Name of the aggregator
-     * @param a Aggregator
-     * @param superstep Current superstep
-     * @return The String representation for the aggregator
-     */
-    protected String aggregatorToString(String aggregatorName, 
-                                        Aggregator<Writable> a,
-                                        long superstep) {
-
-        return new StringBuilder("superstep=").append(superstep).append("\t")
-            .append(aggregatorName).append("=").append(a.getAggregatedValue())
-            .append("\t").append(a.getClass().getCanonicalName()).append("\n")
-            .toString();
+  @Override
+  @SuppressWarnings("rawtypes")
+  public void initialize(Context context, long attempt) throws IOException {
+    Configuration conf = context.getConfiguration();
+    frequency = conf.getInt(FREQUENCY, NEVER);
+    String filename  = conf.get(FILENAME, DEFAULT_FILENAME);
+    if (frequency != NEVER) {
+      Path p = new Path(filename + "_" + attempt);
+      FileSystem fs = FileSystem.get(conf);
+      if (fs.exists(p)) {
+        throw new RuntimeException("aggregatorWriter file already" +
+            " exists: " + p.getName());
+      }
+      output = fs.create(p);
     }
+  }
 
-    private boolean shouldWrite(long superstep) {
-        return ((frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
-                (frequency != NEVER && superstep % frequency == 0));
+  @Override
+  public final void writeAggregator(
+      Map<String, Aggregator<Writable>> aggregators,
+      long superstep) throws IOException {
+    if (shouldWrite(superstep)) {
+      for (Entry<String, Aggregator<Writable>> a:
+        aggregators.entrySet()) {
+        output.writeUTF(aggregatorToString(a.getKey(),
+            a.getValue(),
+            superstep));
+      }
+      output.flush();
     }
+  }
+
+  /**
+   * Implements the way an aggregator is converted into a String.
+   * Override this if you want to implement your own text format.
+   *
+   * @param aggregatorName Name of the aggregator
+   * @param a Aggregator
+   * @param superstep Current superstep
+   * @return The String representation for the aggregator
+   */
+  protected String aggregatorToString(String aggregatorName,
+      Aggregator<Writable> a,
+      long superstep) {
+
+    return new StringBuilder("superstep=").append(superstep).append("\t")
+        .append(aggregatorName).append("=").append(a.getAggregatedValue())
+        .append("\t").append(a.getClass().getCanonicalName()).append("\n")
+        .toString();
+  }
+
+  /**
+   * Should write this superstep?
+   *
+   * @param superstep Superstep to check
+   * @return True if should write, false otherwise
+   */
+  private boolean shouldWrite(long superstep) {
+    return (frequency == AT_THE_END && superstep == LAST_SUPERSTEP) ||
+        (frequency != NEVER && superstep % frequency == 0);
+  }
 
-    @Override
-    public void close() throws IOException {
-        if (output != null) {
-            output.close();
-        }
+  @Override
+  public void close() throws IOException {
+    if (output != null) {
+      output.close();
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexChanges.java Thu Feb 16 22:12:31 2012
@@ -33,40 +33,36 @@ import org.apache.hadoop.io.WritableComp
  * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
-public interface VertexChanges<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable> {
+public interface VertexChanges<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Get the added vertices for this particular vertex index from the previous
+   * superstep.
+   *
+   * @return List of vertices for this vertex index.
+   */
+  List<BasicVertex<I, V, E, M>> getAddedVertexList();
 
-    /**
-     * Get the added vertices for this particular vertex index from the previous
-     * superstep.
-     *
-     * @return List of vertices for this vertex index.
-     */
-    List<BasicVertex<I, V, E, M>> getAddedVertexList();
+  /**
+   * Get the number of times this vertex was removed in the previous
+   * superstep.
+   *
+   * @return Count of time this vertex was removed in the previous superstep
+   */
+  int getRemovedVertexCount();
 
-    /**
-     * Get the number of times this vertex was removed in the previous
-     * superstep.
-     *
-     * @return Count of time this vertex was removed in the previous superstep
-     */
-    int getRemovedVertexCount();
+  /**
+   * Get the added edges for this particular vertex index from the previous
+   * superstep
+   *
+   * @return List of added edges for this vertex index
+   */
+  List<Edge<I, E>> getAddedEdgeList();
 
-    /**
-     * Get the added edges for this particular vertex index from the previous
-     * superstep
-     *
-     * @return List of added edges for this vertex index
-     */
-    List<Edge<I, E>> getAddedEdgeList();
-
-    /**
-     * Get the removed edges by their destination vertex index.
-     *
-     * @return List of destination edges for removal from this vertex index
-     */
-    List<I> getRemovedEdgeList();
+  /**
+   * Get the removed edges by their destination vertex index.
+   *
+   * @return List of destination edges for removal from this vertex index
+   */
+  List<I> getRemovedEdgeList();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexCombiner.java Thu Feb 16 22:12:31 2012
@@ -26,23 +26,22 @@ import org.apache.hadoop.io.WritableComp
 /**
  * Abstract class to extend for combining of messages sent to the same vertex.
  *
- * @param <I extends Writable> index
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexCombiner<I extends WritableComparable,
-                                     M extends Writable> {
-
-   /**
-    * Combines message values for a particular vertex index.
-    *
-    * @param vertexIndex Index of the vertex getting these messages
-    * @param messages Iterable of the messages to be combined
-    * @return Iterable of the combined messages. The returned value cannot 
-    *         be null and its size is required to be smaller or equal to 
-    *         the size of the messages list.
-    * @throws IOException
-    */
-    public abstract Iterable<M> combine(I vertexIndex,
-            Iterable<M> messages) throws IOException;
+    M extends Writable> {
+  /**
+   * Combines message values for a particular vertex index.
+   *
+   * @param vertexIndex Index of the vertex getting these messages
+   * @param messages Iterable of the messages to be combined
+   * @return Iterable of the combined messages. The returned value cannot
+   *         be null and its size is required to be smaller or equal to
+   *         the size of the messages list.
+   * @throws IOException
+   */
+  public abstract Iterable<M> combine(I vertexIndex,
+      Iterable<M> messages) throws IOException;
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexEdgeCount.java Thu Feb 16 22:12:31 2012
@@ -22,45 +22,67 @@ package org.apache.giraph.graph;
  * Simple immutable structure for storing a final vertex and edge count.
  */
 public class VertexEdgeCount {
-    /** Immutable vertices */
-    private final long vertexCount;
-    /** Immutable edges */
-    private final long edgeCount;
-
-    public VertexEdgeCount() {
-        vertexCount = 0;
-        edgeCount = 0;
-    }
-
-    public VertexEdgeCount(long vertexCount, long edgeCount) {
-        this.vertexCount = vertexCount;
-        this.edgeCount = edgeCount;
-    }
-
-    public long getVertexCount() {
-        return vertexCount;
-    }
-
-    public long getEdgeCount() {
-        return edgeCount;
-    }
-
-    public VertexEdgeCount incrVertexEdgeCount(
-            VertexEdgeCount vertexEdgeCount) {
-        return new VertexEdgeCount(
-            vertexCount + vertexEdgeCount.getVertexCount(),
-            edgeCount + vertexEdgeCount.getEdgeCount());
-    }
-
-    public VertexEdgeCount incrVertexEdgeCount(
-            long vertexCount, long edgeCount) {
-        return new VertexEdgeCount(
-            this.vertexCount + vertexCount,
-            this.edgeCount + edgeCount);
-    }
-
-    @Override
-    public String toString() {
-        return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
-    }
+  /** Immutable vertices */
+  private final long vertexCount;
+  /** Immutable edges */
+  private final long edgeCount;
+
+  /**
+   * Default constructor.
+   */
+  public VertexEdgeCount() {
+    vertexCount = 0;
+    edgeCount = 0;
+  }
+
+  /**
+   * Constructor with initial values.
+   *
+   * @param vertexCount Final number of vertices.
+   * @param edgeCount Final number of edges.
+   */
+  public VertexEdgeCount(long vertexCount, long edgeCount) {
+    this.vertexCount = vertexCount;
+    this.edgeCount = edgeCount;
+  }
+
+  public long getVertexCount() {
+    return vertexCount;
+  }
+
+  public long getEdgeCount() {
+    return edgeCount;
+  }
+
+  /**
+   * Increment the both the vertex edge count with a {@link VertexEdgeCount}.
+   *
+   * @param vertexEdgeCount add both the vertices and edges of this object.
+   * @return New immutable object with the new vertex and edge counts.
+   */
+  public VertexEdgeCount incrVertexEdgeCount(
+      VertexEdgeCount vertexEdgeCount) {
+    return new VertexEdgeCount(
+        vertexCount + vertexEdgeCount.getVertexCount(),
+        edgeCount + vertexEdgeCount.getEdgeCount());
+  }
+
+  /**
+   * Increment the both the vertex edge count with primitives.
+   *
+   * @param vertexCount Add this many vertices.
+   * @param edgeCount Add this many edges.
+   * @return New immutable object with the new vertex and edge counts.
+   */
+  public VertexEdgeCount incrVertexEdgeCount(
+      long vertexCount, long edgeCount) {
+    return new VertexEdgeCount(
+        this.vertexCount + vertexCount,
+        this.edgeCount + edgeCount);
+  }
+
+  @Override
+  public String toString() {
+    return "(v=" + getVertexCount() + ", e=" + getEdgeCount() + ")";
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -36,45 +36,45 @@ import java.util.List;
  * @param <I> Vertex id
  * @param <V> Vertex value
  * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexInputFormat<I extends WritableComparable,
-        V extends Writable, E extends Writable, M extends Writable> {
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Logically split the vertices for a graph processing application.
+   *
+   * Each {@link InputSplit} is then assigned to a worker for processing.
+   *
+   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
+   * input files are not physically split into chunks. For e.g. a split could
+   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
+   * also creates the {@link VertexReader} to read the {@link InputSplit}.
+   *
+   * Also, the number of workers is a hint given to the developer to try to
+   * intelligently determine how many splits to create (if this is
+   * adjustable) at runtime.
+   *
+   * @param context Context of the job
+   * @param numWorkers Number of workers used for this job
+   * @return an array of {@link InputSplit}s for the job.
+   */
+  public abstract List<InputSplit> getSplits(
+    JobContext context, int numWorkers)
+    throws IOException, InterruptedException;
 
-    /**
-     * Logically split the vertices for a graph processing application.
-     *
-     * Each {@link InputSplit} is then assigned to a worker for processing.
-     *
-     * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
-     * input files are not physically split into chunks. For e.g. a split could
-     * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
-     * also creates the {@link VertexReader} to read the {@link InputSplit}.
-     *
-     * Also, the number of workers is a hint given to the developer to try to
-     * intelligently determine how many splits to create (if this is
-     * adjustable) at runtime.
-     *
-     * @param context Context of the job
-     * @param numWorkers Number of workers used for this job
-     * @return an array of {@link InputSplit}s for the job.
-     */
-    public abstract List<InputSplit> getSplits(
-        JobContext context, int numWorkers)
-        throws IOException, InterruptedException;
-
-    /**
-     * Create a vertex reader for a given split. The framework will call
-     * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
-     * the split is used.
-     *
-     * @param split the split to be read
-     * @param context the information about the task
-     * @return a new record reader
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public abstract VertexReader<I, V, E, M> createVertexReader(
-        InputSplit split,
-        TaskAttemptContext context) throws IOException;
+  /**
+   * Create a vertex reader for a given split. The framework will call
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param split the split to be read
+   * @param context the information about the task
+   * @return a new record reader
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VertexReader<I, V, E, M> createVertexReader(
+      InputSplit split,
+      TaskAttemptContext context) throws IOException;
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java Thu Feb 16 22:12:31 2012
@@ -36,87 +36,85 @@ import org.json.JSONObject;
  * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
-public class VertexMutations<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable> implements VertexChanges<I, V, E, M> {
-    /** List of added vertices during the last superstep */
-    private final List<BasicVertex<I, V, E, M>> addedVertexList =
-        new ArrayList<BasicVertex<I, V, E, M>>();
-    /** Count of remove vertex requests */
-    private int removedVertexCount = 0;
-    /** List of added edges */
-    private final List<Edge<I, E>> addedEdgeList = new ArrayList<Edge<I, E>>();
-    /** List of removed edges */
-    private final List<I> removedEdgeList = new ArrayList<I>();
-
-    @Override
-    public List<BasicVertex<I, V, E, M>> getAddedVertexList() {
-        return addedVertexList;
-    }
-
-    /**
-     * Add a vertex mutation
-     *
-     * @param vertex Vertex to be added
-     */
-    public void addVertex(BasicVertex<I, V, E, M> vertex) {
-        addedVertexList.add(vertex);
-    }
-
-    @Override
-    public int getRemovedVertexCount() {
-        return removedVertexCount;
-    }
-
-    /**
-     * Removed a vertex mutation (increments a count)
-     */
-    public void removeVertex() {
-        ++removedVertexCount;
-    }
-
-    @Override
-    public List<Edge<I, E>> getAddedEdgeList() {
-        return addedEdgeList;
-    }
-
-    /**
-     * Add an edge to this vertex
-     *
-     * @param edge Edge to be added
-     */
-    public void addEdge(Edge<I, E> edge) {
-        addedEdgeList.add(edge);
-    }
-
-    @Override
-    public List<I> getRemovedEdgeList() {
-        return removedEdgeList;
-    }
-
-    /**
-     * Remove an edge on this vertex
-     *
-     * @param destinationVertexId Vertex index of the destination of the edge
-     */
-    public void removeEdge(I destinationVertexId) {
-        removedEdgeList.add(destinationVertexId);
-    }
-
-    @Override
-    public String toString() {
-        JSONObject jsonObject = new JSONObject();
-        try {
-            jsonObject.put("added vertices", getAddedVertexList().toString());
-            jsonObject.put("added edges", getAddedEdgeList().toString());
-            jsonObject.put("removed vertex count", getRemovedVertexCount());
-            jsonObject.put("removed edges", getRemovedEdgeList().toString());
-            return jsonObject.toString();
-        } catch (JSONException e) {
-            throw new IllegalStateException("toString: Got a JSON exception",
-                                            e);
-        }
+public class VertexMutations<I extends WritableComparable,
+    V extends Writable, E extends Writable,
+    M extends Writable> implements VertexChanges<I, V, E, M> {
+  /** List of added vertices during the last superstep */
+  private final List<BasicVertex<I, V, E, M>> addedVertexList =
+      new ArrayList<BasicVertex<I, V, E, M>>();
+  /** Count of remove vertex requests */
+  private int removedVertexCount = 0;
+  /** List of added edges */
+  private final List<Edge<I, E>> addedEdgeList = new ArrayList<Edge<I, E>>();
+  /** List of removed edges */
+  private final List<I> removedEdgeList = new ArrayList<I>();
+
+  @Override
+  public List<BasicVertex<I, V, E, M>> getAddedVertexList() {
+    return addedVertexList;
+  }
+
+  /**
+   * Add a vertex mutation
+   *
+   * @param vertex Vertex to be added
+   */
+  public void addVertex(BasicVertex<I, V, E, M> vertex) {
+    addedVertexList.add(vertex);
+  }
+
+  @Override
+  public int getRemovedVertexCount() {
+    return removedVertexCount;
+  }
+
+  /**
+   * Removed a vertex mutation (increments a count)
+   */
+  public void removeVertex() {
+    ++removedVertexCount;
+  }
+
+  @Override
+  public List<Edge<I, E>> getAddedEdgeList() {
+    return addedEdgeList;
+  }
+
+  /**
+   * Add an edge to this vertex
+   *
+   * @param edge Edge to be added
+   */
+  public void addEdge(Edge<I, E> edge) {
+    addedEdgeList.add(edge);
+  }
+
+  @Override
+  public List<I> getRemovedEdgeList() {
+    return removedEdgeList;
+  }
+
+  /**
+   * Remove an edge on this vertex
+   *
+   * @param destinationVertexId Vertex index of the destination of the edge
+   */
+  public void removeEdge(I destinationVertexId) {
+    removedEdgeList.add(destinationVertexId);
+  }
+
+  @Override
+  public String toString() {
+    JSONObject jsonObject = new JSONObject();
+    try {
+      jsonObject.put("added vertices", getAddedVertexList().toString());
+      jsonObject.put("added edges", getAddedEdgeList().toString());
+      jsonObject.put("removed vertex count", getRemovedVertexCount());
+      jsonObject.put("removed edges", getRemovedEdgeList().toString());
+      return jsonObject.toString();
+    } catch (JSONException e) {
+      throw new IllegalStateException("toString: Got a JSON exception",
+          e);
     }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexOutputFormat.java Thu Feb 16 22:12:31 2012
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.Outpu
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 /**
@@ -38,45 +37,45 @@ import org.apache.hadoop.mapreduce.TaskA
  */
 @SuppressWarnings("rawtypes")
 public abstract class VertexOutputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable> {
-    /**
-     * Create a vertex writer for a given split. The framework will call
-     * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
-     * the split is used.
-     *
-     * @param context the information about the task
-     * @return a new vertex writer
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public abstract VertexWriter<I, V, E> createVertexWriter(
-        TaskAttemptContext context) throws IOException, InterruptedException;
+    I extends WritableComparable, V extends Writable, E extends Writable> {
+  /**
+   * Create a vertex writer for a given split. The framework will call
+   * {@link VertexReader#initialize(InputSplit, TaskAttemptContext)} before
+   * the split is used.
+   *
+   * @param context the information about the task
+   * @return a new vertex writer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract VertexWriter<I, V, E> createVertexWriter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
 
-    /**
-     * Check for validity of the output-specification for the job.
-     * (Copied from Hadoop OutputFormat)
-     *
-     * <p>This is to validate the output specification for the job when it is
-     * a job is submitted.  Typically checks that it does not already exist,
-     * throwing an exception when it already exists, so that output is not
-     * overwritten.</p>
-     *
-     * @param context information about the job
-     * @throws IOException when output should not be attempted
-     */
-    public abstract void checkOutputSpecs(JobContext context)
-        throws IOException, InterruptedException;
+  /**
+   * Check for validity of the output-specification for the job.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * <p>This is to validate the output specification for the job when it is
+   * a job is submitted.  Typically checks that it does not already exist,
+   * throwing an exception when it already exists, so that output is not
+   * overwritten.</p>
+   *
+   * @param context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  public abstract void checkOutputSpecs(JobContext context)
+    throws IOException, InterruptedException;
 
-    /**
-     * Get the output committer for this output format. This is responsible
-     * for ensuring the output is committed correctly.
-     * (Copied from Hadoop OutputFormat)
-     *
-     * @param context the task context
-     * @return an output committer
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public abstract OutputCommitter getOutputCommitter(
-        TaskAttemptContext context) throws IOException, InterruptedException;
+  /**
+   * Get the output committer for this output format. This is responsible
+   * for ensuring the output is committed correctly.
+   * (Copied from Hadoop OutputFormat)
+   *
+   * @param context the task context
+   * @return an output committer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public abstract OutputCommitter getOutputCommitter(
+    TaskAttemptContext context) throws IOException, InterruptedException;
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java Thu Feb 16 22:12:31 2012
@@ -25,54 +25,63 @@ import org.apache.hadoop.mapreduce.TaskA
 
 import java.io.IOException;
 
+/**
+ * Analogous to {@link RecordReader} for vertices.  Will read the vertices
+ * from an input split.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
 @SuppressWarnings("rawtypes")
-public interface VertexReader<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable> {
-    /**
-     * Use the input split and context to setup reading the vertices.
-     * Guaranteed to be called prior to any other function.
-     *
-     * @param inputSplit
-     * @param context
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void initialize(InputSplit inputSplit, TaskAttemptContext context)
-        throws IOException, InterruptedException;
-
-    /**
-     *
-     * @return false iff there are no more vertices
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    boolean nextVertex() throws IOException, InterruptedException;
-
-    /**
-     *
-     * @return the current vertex which has been read.  nextVertex() should be called first.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException;
-
-    /**
-     * Close this {@link VertexReader} to future operations.
-     *
-     * @throws IOException
-     */
-    void close() throws IOException;
-
-    /**
-     * How much of the input has the {@link VertexReader} consumed i.e.
-     * has been processed by?
-     *
-     * @return Progress from <code>0.0</code> to <code>1.0</code>.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    float getProgress() throws IOException, InterruptedException;
+public interface VertexReader<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Use the input split and context to setup reading the vertices.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param inputSplit Input split to be used for reading vertices.
+   * @param context Context from the task.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void initialize(InputSplit inputSplit, TaskAttemptContext context)
+    throws IOException, InterruptedException;
+
+  /**
+   *
+   * @return false iff there are no more vertices
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean nextVertex() throws IOException, InterruptedException;
+
+  /**
+   * Get the current vertex.
+   *
+   * @return the current vertex which has been read.
+   *         nextVertex() should be called first.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  BasicVertex<I, V, E, M> getCurrentVertex()
+    throws IOException, InterruptedException;
+
+  /**
+   * Close this {@link VertexReader} to future operations.
+   *
+   * @throws IOException
+   */
+  void close() throws IOException;
+
+  /**
+   * How much of the input has the {@link VertexReader} consumed i.e.
+   * has been processed by?
+   *
+   * @return Progress from <code>0.0</code> to <code>1.0</code>.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  float getProgress() throws IOException, InterruptedException;
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Thu Feb 16 22:12:31 2012
@@ -31,110 +31,113 @@ import java.util.List;
  * Default implementation of how to resolve vertex creation/removal, messages
  * to nonexistent vertices, etc.
  *
- * @param <I>
- * @param <V>
- * @param <E>
- * @param <M>
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class VertexResolver<I extends WritableComparable, V extends Writable,
-        E extends Writable, M extends Writable>
-        implements BasicVertexResolver<I, V, E, M>, Configurable {
-    /** Configuration */
-    private Configuration conf = null;
-
-    private GraphState<I,V,E,M> graphState;
-
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(VertexResolver.class);
-
-    @Override
-    public BasicVertex<I, V, E, M> resolve(
-            I vertexId,
-            BasicVertex<I, V, E, M> vertex,
-            VertexChanges<I, V, E, M> vertexChanges,
-            Iterable<M> messages) {
-        // Default algorithm:
-        // 1. If the vertex exists, first prune the edges
-        // 2. If vertex removal desired, remove the vertex.
-        // 3. If creation of vertex desired, pick first vertex
-        // 4. If vertex doesn't exist, but got messages, create
-        // 5. If edge addition, add the edges
-        if (vertex != null) {
-            if (vertexChanges != null) {
-                List<I> removedEdgeList = vertexChanges.getRemovedEdgeList();
-                for (I removedDestVertex : removedEdgeList) {
-                    E removeEdge =
-                        ((MutableVertex<I, V, E, M>) vertex).removeEdge(
-                            removedDestVertex);
-                    if (removeEdge == null) {
-                        LOG.warn("resolve: Failed to remove edge with " +
-                                 "destination " + removedDestVertex + "on " +
-                                 vertex + " since it doesn't exist.");
-                    }
-                }
-                if (vertexChanges.getRemovedVertexCount() > 0) {
-                    vertex = null;
-                }
-            }
+    E extends Writable, M extends Writable>
+    implements BasicVertexResolver<I, V, E, M>, Configurable {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(VertexResolver.class);
+  /** Configuration */
+  private Configuration conf = null;
+  /** Stored graph state */
+  private GraphState<I, V, E, M> graphState;
+
+  @Override
+  public BasicVertex<I, V, E, M> resolve(
+      I vertexId,
+      BasicVertex<I, V, E, M> vertex,
+      VertexChanges<I, V, E, M> vertexChanges,
+      Iterable<M> messages) {
+    // Default algorithm:
+      // 1. If the vertex exists, first prune the edges
+    // 2. If vertex removal desired, remove the vertex.
+    // 3. If creation of vertex desired, pick first vertex
+    // 4. If vertex doesn't exist, but got messages, create
+    // 5. If edge addition, add the edges
+    if (vertex != null) {
+      if (vertexChanges != null) {
+        List<I> removedEdgeList = vertexChanges.getRemovedEdgeList();
+        for (I removedDestVertex : removedEdgeList) {
+          E removeEdge =
+              ((MutableVertex<I, V, E, M>) vertex).removeEdge(
+                  removedDestVertex);
+          if (removeEdge == null) {
+            LOG.warn("resolve: Failed to remove edge with " +
+                "destination " + removedDestVertex + "on " +
+                vertex + " since it doesn't exist.");
+          }
         }
-
-        if (vertex == null) {
-            if (vertexChanges != null) {
-                if (!vertexChanges.getAddedVertexList().isEmpty()) {
-                    vertex = vertexChanges.getAddedVertexList().get(0);
-                }
-            }
-            if (vertex == null && messages != null
-                    && !Iterables.isEmpty(messages)) {
-                vertex = instantiateVertex();
-                vertex.initialize(vertexId,
-                                  BspUtils.<V>createVertexValue(getConf()),
-                                  null,
-                                  messages);
-            }
-        } else {
-            if ((vertexChanges != null) &&
-                    (!vertexChanges.getAddedVertexList().isEmpty())) {
-                LOG.warn("resolve: Tried to add a vertex with id = " +
-                         vertex.getVertexId() + " when one already " +
-                        "exists.  Ignoring the add vertex request.");
-            }
-        }
-
-        if (vertexChanges != null &&
-                !vertexChanges.getAddedEdgeList().isEmpty()) {
-            MutableVertex<I, V, E, M> mutableVertex =
-                (MutableVertex<I, V, E, M>) vertex;
-            for (Edge<I, E> edge : vertexChanges.getAddedEdgeList()) {
-                edge.setConf(getConf());
-                mutableVertex.addEdge(edge.getDestVertexId(),
-                                      edge.getEdgeValue());
-            }
+        if (vertexChanges.getRemovedVertexCount() > 0) {
+          vertex = null;
         }
-
-        return vertex;
+      }
     }
 
-    @Override
-    public BasicVertex<I, V, E, M> instantiateVertex() {
-        BasicVertex<I, V, E, M> vertex =
-            BspUtils.<I, V, E, M>createVertex(getConf());
-        vertex.setGraphState(graphState);
-        return vertex;
+    if (vertex == null) {
+      if (vertexChanges != null) {
+        if (!vertexChanges.getAddedVertexList().isEmpty()) {
+          vertex = vertexChanges.getAddedVertexList().get(0);
+        }
+      }
+      if (vertex == null && messages != null && !Iterables.isEmpty(messages)) {
+        vertex = instantiateVertex();
+        vertex.initialize(vertexId,
+            BspUtils.<V>createVertexValue(getConf()),
+            null,
+            messages);
+      }
+    } else {
+      if ((vertexChanges != null) &&
+          (!vertexChanges.getAddedVertexList().isEmpty())) {
+        LOG.warn("resolve: Tried to add a vertex with id = " +
+            vertex.getVertexId() + " when one already " +
+            "exists.  Ignoring the add vertex request.");
+      }
     }
 
-    @Override
-    public Configuration getConf() {
-        return conf;
+    if (vertexChanges != null &&
+        !vertexChanges.getAddedEdgeList().isEmpty()) {
+      MutableVertex<I, V, E, M> mutableVertex =
+          (MutableVertex<I, V, E, M>) vertex;
+      for (Edge<I, E> edge : vertexChanges.getAddedEdgeList()) {
+        edge.setConf(getConf());
+        mutableVertex.addEdge(edge.getDestVertexId(),
+            edge.getEdgeValue());
+      }
     }
 
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
+    return vertex;
+  }
 
-    public void setGraphState(GraphState<I, V, E, M> graphState) {
-      this.graphState = graphState;
-    }
+  @Override
+  public BasicVertex<I, V, E, M> instantiateVertex() {
+    BasicVertex<I, V, E, M> vertex =
+        BspUtils.<I, V, E, M>createVertex(getConf());
+    vertex.setGraphState(graphState);
+    return vertex;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Set the graph state.
+   *
+   * @param graphState Graph state saved.
+   */
+  public void setGraphState(GraphState<I, V, E, M> graphState) {
+    this.graphState = graphState;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexWriter.java Thu Feb 16 22:12:31 2012
@@ -32,36 +32,34 @@ import org.apache.hadoop.mapreduce.TaskA
  * @param <E> Edge value
  */
 @SuppressWarnings("rawtypes")
-public interface VertexWriter<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable> {
-    /**
-     * Use the context to setup writing the vertices.
-     * Guaranteed to be called prior to any other function.
-     *
-     * @param context
-     * @throws IOException
-     */
-    void initialize(TaskAttemptContext context) throws IOException;
+public interface VertexWriter<I extends WritableComparable, V extends Writable,
+    E extends Writable> {
+  /**
+   * Use the context to setup writing the vertices.
+   * Guaranteed to be called prior to any other function.
+   *
+   * @param context Context used to write the vertices.
+   * @throws IOException
+   */
+  void initialize(TaskAttemptContext context) throws IOException;
 
-    /**
-     * Writes the next vertex and associated data
-     *
-     * @param vertex set the properties of this vertex
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void writeVertex(BasicVertex<I, V, E, ?> vertex)
-        throws IOException, InterruptedException;
+  /**
+   * Writes the next vertex and associated data
+   *
+   * @param vertex set the properties of this vertex
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void writeVertex(BasicVertex<I, V, E, ?> vertex)
+    throws IOException, InterruptedException;
 
-    /**
-     * Close this {@link VertexWriter} to future operations.
-     *
-     * @param context the context of the task
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    void close(TaskAttemptContext context)
-        throws IOException, InterruptedException;
+  /**
+   * Close this {@link VertexWriter} to future operations.
+   *
+   * @param context the context of the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void close(TaskAttemptContext context)
+    throws IOException, InterruptedException;
 }