You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC

svn commit: r1245205 [4/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Thu Feb 16 22:12:31 2012
@@ -34,100 +34,98 @@ import org.apache.hadoop.ipc.VersionedPr
 /**
  * Basic interface for communication between workers.
  *
- * @param <I extends Writable> vertex id
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 /*if_not[HADOOP]
  else[HADOOP]*/
 @TokenInfo(BspTokenSelector.class)
 /*end[HADOOP]*/
-public interface CommunicationsInterface<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends VersionedProtocol {
-
-    /**
-     * Interface Version History
-     *
-     * 0 - First Version
-     */
-    static final long versionID = 0L;
-
-    /**
-     * Adds incoming message.
-     *
-     * @param vertexIndex
-     * @param msg
-     * @throws IOException
-     */
-    void putMsg(I vertexIndex, M msg) throws IOException;
-
-    /**
-     * Adds incoming message list.
-     *
-     * @param vertexIndex Vertex index where the message are added
-     * @param msgList messages added
-     * @throws IOException
-     */
-    void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException;
-
-    /**
-     * Adds a list of vertex ids and their respective message lists.
-     *
-     * @param vertexIdMessagesList messages to be added
-     * @throws IOException
-     */
-    void putVertexIdMessagesList(
-        VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException;
-
-    /**
-     * Adds vertex list (index, value, edges, etc.) to the appropriate worker.
-     *
-     * @param partitionId Partition id of the vertices to be added.
-     * @param vertexList List of vertices to add
-     */
-    void putVertexList(int partitionId,
-                       VertexList<I, V, E, M> vertexList) throws IOException;
-
-    /**
-     * Add an edge to a remote vertex
-     *
-     * @param vertexIndex Vertex index where the edge is added
-     * @param edge Edge to be added
-     * @throws IOException
-     */
-    void addEdge(I vertexIndex, Edge<I, E> edge) throws IOException;
-
-    /**
-     * Remove an edge on a remote vertex
-     *
-     * @param vertexIndex Vertex index where the edge is added
-     * @param destinationVertexIndex Edge vertex index to be removed
-     * @throws IOException
-     */
-    void removeEdge(I vertexIndex, I destinationVertexIndex) throws IOException;
-
-    /**
-     * Add a remote vertex
-     *
-     * @param vertex Vertex that will be added
-     * @throws IOException
-     */
-    void addVertex(BasicVertex<I, V, E, M> vertex) throws IOException;
-
-    /**
-     * Removed a remote vertex
-     *
-     * @param vertexIndex Vertex index representing vertex to be removed
-     * @throws IOException
-     */
-    void removeVertex(I vertexIndex) throws IOException;
-
-    /**
-     * @return The name of this worker in the format "hostname:port".
-     */
-    String getName();
+public interface CommunicationsInterface<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends VersionedProtocol {
+  /**
+   * Interface Version History
+   *
+   * 0 - First Version
+   */
+  long VERSION_ID = 0L;
+
+  /**
+   * Adds incoming message.
+   *
+   * @param vertexIndex Destination vertex index.
+   * @param message Message to store.
+   * @throws IOException
+   */
+  void putMsg(I vertexIndex, M message) throws IOException;
+
+  /**
+   * Adds incoming message list.
+   *
+   * @param vertexIndex Vertex index where the message are added
+   * @param msgList messages added
+   * @throws IOException
+   */
+  void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException;
+
+  /**
+   * Adds a list of vertex ids and their respective message lists.
+   *
+   * @param vertexIdMessagesList messages to be added
+   * @throws IOException
+   */
+  void putVertexIdMessagesList(
+      VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException;
+
+  /**
+   * Adds vertex list (index, value, edges, etc.) to the appropriate worker.
+   *
+   * @param partitionId Partition id of the vertices to be added.
+   * @param vertexList List of vertices to add
+   */
+  void putVertexList(int partitionId,
+      VertexList<I, V, E, M> vertexList) throws IOException;
+
+  /**
+   * Add an edge to a remote vertex
+   *
+   * @param vertexIndex Vertex index where the edge is added
+   * @param edge Edge to be added
+   * @throws IOException
+   */
+  void addEdge(I vertexIndex, Edge<I, E> edge) throws IOException;
+
+  /**
+   * Remove an edge on a remote vertex
+   *
+   * @param vertexIndex Vertex index where the edge is added
+   * @param destinationVertexIndex Edge vertex index to be removed
+   * @throws IOException
+   */
+  void removeEdge(I vertexIndex, I destinationVertexIndex) throws IOException;
+
+  /**
+   * Add a remote vertex
+   *
+   * @param vertex Vertex that will be added
+   * @throws IOException
+   */
+  void addVertex(BasicVertex<I, V, E, M> vertex) throws IOException;
+
+  /**
+   * Removed a remote vertex
+   *
+   * @param vertexIndex Vertex index representing vertex to be removed
+   * @throws IOException
+   */
+  void removeVertex(I vertexIndex) throws IOException;
+
+  /**
+   * @return The name of this worker in the format "hostname:port".
+   */
+  String getName();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java Thu Feb 16 22:12:31 2012
@@ -27,22 +27,29 @@ import org.apache.hadoop.io.Writable;
  *
  * @param <M> message type
  */
-public class MsgList<M extends Writable>
-    extends ArrayListWritable<M> {
-    /** Defining a layout version for a serializable class. */
-    private static final long serialVersionUID = 100L;
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+  /** Defining a layout version for a serializable class. */
+  private static final long serialVersionUID = 100L;
 
-    public MsgList() {
-        super();
-    }
-    
-    public MsgList(MsgList<M> msgList) {
-        super(msgList);
-    }
+  /**
+   * Default constructor.
+   */
+  public MsgList() {
+    super();
+  }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setClass() {
-        setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
-    }
+  /**
+   * Copy constructor.
+   *
+   * @param msgList List of messages for writing.
+   */
+  public MsgList(MsgList<M> msgList) {
+    super(msgList);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setClass() {
+    setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Thu Feb 16 22:12:31 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.comm;
 import java.io.IOException;
 
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 
 /*if_not[HADOOP]
 else[HADOOP]*/
@@ -48,119 +47,155 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.mapreduce.Mapper;
 
+/**
+ * Used to implement abstract {@link BasicRPCCommunications} methods.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
 @SuppressWarnings("rawtypes")
-public class RPCCommunications<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-/*if_not[HADOOP]
-extends BasicRPCCommunications<I, V, E, M, Object> {
-else[HADOOP]*/
-        extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
-/*end[HADOOP]*/
-
-    /** Class logger */
-    public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
+public class RPCCommunications<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+  /*if_not[HADOOP]
+    extends BasicRPCCommunications<I, V, E, M, Object> {
+    else[HADOOP]*/
+    extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
+  /*end[HADOOP]*/
+
+  /** Class logger */
+  public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
+
+  /**
+   * Constructor.
+   *
+   * @param context Context to be saved.
+   * @param service Server worker.
+   * @param graphState Graph state from infrastructure.
+   * @throws IOException
+   * @throws UnknownHostException
+   * @throws InterruptedException
+   */
+  public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+      CentralizedServiceWorker<I, V, E, M> service,
+      GraphState<I, V, E, M> graphState) throws
+      IOException, InterruptedException {
+    super(context, service);
+  }
 
-    public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
-                             CentralizedServiceWorker<I, V, E, M> service,
-                             GraphState<I, V, E, M> graphState)
-            throws IOException, UnknownHostException, InterruptedException {
-        super(context, service);
-    }
-
-/*if_not[HADOOP]
+  /*if_not[HADOOP]
     protected Object createJobToken() throws IOException {
         return null;
     }
-else[HADOOP]*/
-    protected Token<JobTokenIdentifier> createJobToken() throws IOException {
-        String localJobTokenFile = System.getenv().get(
-                UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
-        if (localJobTokenFile != null) {
-            Credentials credentials =
-                TokenCache.loadTokens(localJobTokenFile, conf);
-            return TokenCache.getJobToken(credentials);
-        }
-        return null;
-    }
-/*end[HADOOP]*/
-
-    protected Server getRPCServer(
-            InetSocketAddress myAddress, int numHandlers, String jobId,
-/*if_not[HADOOP]
+    else[HADOOP]*/
+  /**
+   * Create the job token.
+   *
+   * @return Job token.
+   */
+  protected Token<JobTokenIdentifier> createJobToken() throws IOException {
+    String localJobTokenFile = System.getenv().get(
+        UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+    if (localJobTokenFile != null) {
+      Credentials credentials =
+          TokenCache.loadTokens(localJobTokenFile, conf);
+      return TokenCache.getJobToken(credentials);
+    }
+    return null;
+  }
+  /*end[HADOOP]*/
+
+  /**
+   * Get the RPC server.
+   *
+   * @param myAddress My address.
+   * @param numHandlers Number of handler threads.
+   * @param jobId Job id.
+   * @param jt Jobtoken indentifier.
+   * @return RPC server.
+   */
+  protected Server getRPCServer(
+      InetSocketAddress myAddress, int numHandlers, String jobId,
+      /*if_not[HADOOP]
             Object jt) throws IOException {
         return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
             numHandlers, false, conf);
     }
-else[HADOOP]*/
-            Token<JobTokenIdentifier> jt) throws IOException {
-        @SuppressWarnings("deprecation")
-        String hadoopSecurityAuthorization =
-            ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
-        if (conf.getBoolean(
-                    hadoopSecurityAuthorization,
-                    false)) {
-            ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
-        }
-        JobTokenSecretManager jobTokenSecretManager =
-            new JobTokenSecretManager();
-        if (jt != null) { //could be null in the case of some unit tests
-            jobTokenSecretManager.addTokenForJob(jobId, jt);
-            if (LOG.isInfoEnabled()) {
-                LOG.info("getRPCServer: Added jobToken " + jt);
-            }
-        }
-        return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
-                numHandlers, false, conf, jobTokenSecretManager);
-    }
-/*end[HADOOP]*/
-
-    protected CommunicationsInterface<I, V, E, M> getRPCProxy(
-            final InetSocketAddress addr,
-            String jobId,
-/*if_not[HADOOP]
-            Object jt)
-else[HADOOP]*/
-            Token<JobTokenIdentifier> jt)
-/*end[HADOOP]*/
-            throws IOException, InterruptedException {
-        final Configuration config = new Configuration(conf);
-
-/*if_not[HADOOP]
+      else[HADOOP]*/
+      Token<JobTokenIdentifier> jt) throws IOException {
+    @SuppressWarnings("deprecation")
+    String hadoopSecurityAuthorization =
+      ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
+    if (conf.getBoolean(
+        hadoopSecurityAuthorization,
+        false)) {
+      ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
+    }
+    JobTokenSecretManager jobTokenSecretManager =
+        new JobTokenSecretManager();
+    if (jt != null) { //could be null in the case of some unit tests
+      jobTokenSecretManager.addTokenForJob(jobId, jt);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getRPCServer: Added jobToken " + jt);
+      }
+    }
+    return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
+        numHandlers, false, conf, jobTokenSecretManager);
+  }
+  /*end[HADOOP]*/
+
+  /**
+   * Get the RPC proxy.
+   *
+   * @param addr Address of the RPC server.
+   * @param jobId Job id.
+   * @param jt Job token.
+   * @return Proxy of the RPC server.
+   */
+  protected CommunicationsInterface<I, V, E, M> getRPCProxy(
+    final InetSocketAddress addr,
+    String jobId,
+    /*if_not[HADOOP]
+    Object jt)
+      else[HADOOP]*/
+    Token<JobTokenIdentifier> jt)
+    /*end[HADOOP]*/
+    throws IOException, InterruptedException {
+    final Configuration config = new Configuration(conf);
+    /*if_not[HADOOP]
         @SuppressWarnings("unchecked")
         CommunicationsInterface<I, V, E, M> proxy =
             (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
                  CommunicationsInterface.class, versionID, addr, config);
         return proxy;
-else[HADOOP]*/
-        if (jt == null) {
-            @SuppressWarnings("unchecked")
-            CommunicationsInterface<I, V, E, M> proxy =
-                (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
-                     CommunicationsInterface.class, versionID, addr, config);
-            return proxy;
+      else[HADOOP]*/
+    if (jt == null) {
+      @SuppressWarnings("unchecked")
+      CommunicationsInterface<I, V, E, M> proxy =
+        (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+          CommunicationsInterface.class, VERSION_ID, addr, config);
+      return proxy;
+    }
+    jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
+        addr.getPort()));
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    current.addToken(jt);
+    UserGroupInformation owner =
+        UserGroupInformation.createRemoteUser(jobId);
+    owner.addToken(jt);
+    @SuppressWarnings("unchecked")
+    CommunicationsInterface<I, V, E, M> proxy =
+      owner.doAs(new PrivilegedExceptionAction<
+        CommunicationsInterface<I, V, E, M>>() {
+        @Override
+        public CommunicationsInterface<I, V, E, M> run() throws Exception {
+          // All methods in CommunicationsInterface will be used for RPC
+          return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+            CommunicationsInterface.class, VERSION_ID, addr, config);
         }
-        jt.setService(new Text(addr.getAddress().getHostAddress() + ":"
-                               + addr.getPort()));
-        UserGroupInformation current = UserGroupInformation.getCurrentUser();
-        current.addToken(jt);
-        UserGroupInformation owner =
-            UserGroupInformation.createRemoteUser(jobId);
-        owner.addToken(jt);
-        @SuppressWarnings("unchecked")
-        CommunicationsInterface<I, V, E, M> proxy =
-                      owner.doAs(new PrivilegedExceptionAction<
-                              CommunicationsInterface<I, V, E, M>>() {
-            @Override
-            public CommunicationsInterface<I, V, E, M> run() throws Exception {
-                // All methods in CommunicationsInterface will be used for RPC
-                return (CommunicationsInterface<I, V, E, M> )RPC.getProxy(
-                    CommunicationsInterface.class, versionID, addr, config);
-            }
-        });
-        return proxy;
-/*end[HADOOP]*/
-    }
+      });
+    return proxy;
+    /*end[HADOOP]*/
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java Thu Feb 16 22:12:31 2012
@@ -26,45 +26,47 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 
 /**
- * Interface for message communication server
+ * Interface for message communication server.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface ServerInterface<I extends WritableComparable,
-                                 V extends Writable,
-                                 E extends Writable,
-                                 M extends Writable>
-                                 extends Closeable,
-                                 WorkerCommunications<I, V, E, M> {
-    /**
-     *  Setup the server.
-     */
-    void setup();
+    V extends Writable, E extends Writable, M extends Writable>
+    extends Closeable, WorkerCommunications<I, V, E, M> {
+  /**
+   *  Setup the server.
+   */
+  void setup();
 
-    /**
-     * Move the in transition messages to the in messages for every vertex and
-     * add new connections to any newly appearing RPC proxies.
-     */
-    void prepareSuperstep();
+  /**
+   * Move the in transition messages to the in messages for every vertex and
+   * add new connections to any newly appearing RPC proxies.
+   */
+  void prepareSuperstep();
 
-    /**
-     * Flush all outgoing messages.  This will synchronously ensure that all
-     * messages have been send and delivered prior to returning.
-     *
-     * @param context Context used to signal process
-     * @return Number of messages sent during the last superstep
-     * @throws IOException
-     */
-    long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
+  /**
+   * Flush all outgoing messages.  This will synchronously ensure that all
+   * messages have been send and delivered prior to returning.
+   *
+   * @param context Context used to signal process
+   * @return Number of messages sent during the last superstep
+   * @throws IOException
+   */
+  long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
 
-    /**
-     * Closes all connections.
-     *
-     * @throws IOException
-     */
-    void closeConnections() throws IOException;
+  /**
+   * Closes all connections.
+   *
+   * @throws IOException
+   */
+  void closeConnections() throws IOException;
 
-    /**
-     * Shuts down.
-     */
-    void close();
+  /**
+   * Shuts down.
+   */
+  void close();
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java Thu Feb 16 22:12:31 2012
@@ -32,65 +32,75 @@ import org.apache.hadoop.io.WritableComp
  * This object is only used for transporting list of vertices and their
  * respective messages to a destination RPC server.
  *
- * @param <I extends Writable> vertex id
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class VertexIdMessages<I extends WritableComparable, M extends Writable>
-        implements Writable, Configurable {
-    /** Vertex id */
-    private I vertexId;
-    /** Message list corresponding to vertex id */
-    private MsgList<M> msgList;
-    /** Configuration from Configurable */
-    private Configuration conf;
-
-    /**
-     * Reflective constructor.
-     */
-    public VertexIdMessages() {}
-
-    /**
-     * Constructor used with creating initial values.
-     *
-     * @param vertexId Vertex id to be sent
-     * @param msgList Mesage list for the vertex id to be sent
-     */
-    public VertexIdMessages(I vertexId, MsgList<M> msgList) {
-        this.vertexId = vertexId;
-        this.msgList = msgList;
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        vertexId = BspUtils.<I>createVertexIndex(getConf());
-        vertexId.readFields(input);
-        msgList = new MsgList<M>();
-        msgList.setConf(getConf());
-        msgList.readFields(input);
-    }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        vertexId.write(output);
-        msgList.write(output);
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public I getVertexId() {
-        return vertexId;
-    }
-
-    public MsgList<M> getMessageList() {
-        return msgList;
-    }
- }
+    implements Writable, Configurable {
+  /** Vertex id */
+  private I vertexId;
+  /** Message list corresponding to vertex id */
+  private MsgList<M> msgList;
+  /** Configuration from Configurable */
+  private Configuration conf;
+
+  /**
+   * Reflective constructor.
+   */
+  public VertexIdMessages() { }
+
+  /**
+   * Constructor used with creating initial values.
+   *
+   * @param vertexId Vertex id to be sent
+   * @param msgList Mesage list for the vertex id to be sent
+   */
+  public VertexIdMessages(I vertexId, MsgList<M> msgList) {
+    this.vertexId = vertexId;
+    this.msgList = msgList;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    vertexId = BspUtils.<I>createVertexIndex(getConf());
+    vertexId.readFields(input);
+    msgList = new MsgList<M>();
+    msgList.setConf(getConf());
+    msgList.readFields(input);
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    vertexId.write(output);
+    msgList.write(output);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the vertex id.
+   *
+   * @return Vertex id.
+   */
+  public I getVertexId() {
+    return vertexId;
+  }
+
+  /**
+   * Get the message list.
+   *
+   * @return Message list.
+   */
+  public MsgList<M> getMessageList() {
+    return msgList;
+  }
+}

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java Thu Feb 16 22:12:31 2012
@@ -25,27 +25,35 @@ import org.apache.hadoop.io.WritableComp
  * Wrapper around {@link ArrayListWritable} that provides the list for
  * {@link VertexIdMessages}.
  *
- * @param <I extends Writable> vertex id
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public class VertexIdMessagesList<I extends WritableComparable,
-        M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> {
-    /** Defining a layout version for a serializable class. */
-    private static final long serialVersionUID = 100L;
+    M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> {
+  /** Defining a layout version for a serializable class. */
+  private static final long serialVersionUID = 100L;
 
-    public VertexIdMessagesList() {
-        super();
-    }
+  /**
+   * Default constructor.
+   */
+  public VertexIdMessagesList() {
+    super();
+  }
 
-    public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) {
-        super(vertexIdMessagesList);
-    }
+  /**
+   * Copy constructor.
+   *
+   * @param vertexIdMessagesList List to be copied.
+   */
+  public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) {
+    super(vertexIdMessagesList);
+  }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setClass() {
-        setClass((Class<VertexIdMessages<I, M>>)
-                 (new VertexIdMessages<I, M>()).getClass());
-    }
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setClass() {
+    setClass((Class<VertexIdMessages<I, M>>)
+      (new VertexIdMessages<I, M>()).getClass());
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java Thu Feb 16 22:12:31 2012
@@ -33,24 +33,22 @@ import org.apache.hadoop.io.WritableComp
  * @param <M> Message value
  */
 @SuppressWarnings("rawtypes")
-public class VertexList<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends ArrayListWritable<BasicVertex<I, V, E, M>> {
-    /** Defining a layout version for a serializable class. */
-    private static final long serialVersionUID = 1000L;
+public class VertexList<I extends WritableComparable,
+    V extends Writable, E extends Writable,
+    M extends Writable>
+    extends ArrayListWritable<BasicVertex<I, V, E, M>> {
+  /** Defining a layout version for a serializable class. */
+  private static final long serialVersionUID = 1000L;
 
-    /**
-     * Default constructor for reflection
-     */
-    public VertexList() {}
+  /**
+   * Default constructor for reflection
+   */
+  public VertexList() { }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setClass() {
-        setClass((Class<BasicVertex<I, V, E, M>>)
-                 BspUtils.<I, V, E, M>getVertexClass(getConf()));
-    }
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setClass() {
+    setClass((Class<BasicVertex<I, V, E, M>>)
+        BspUtils.<I, V, E, M>getVertexClass(getConf()));
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java Thu Feb 16 22:12:31 2012
@@ -33,80 +33,78 @@ import java.util.Map;
 /**
  * Public interface for workers to do message communication
  *
- * @param <I extends Writable> vertex id
- * @param <V extends Writable> vertex value
- * @param <E extends Writable> edge value
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public interface WorkerCommunications<I extends WritableComparable,
-                                      V extends Writable,
-                                      E extends Writable,
-                                      M extends Writable> {
-    /**
-     * Fix changes to the workers and the mapping between partitions and
-     * workers.
-     */
-    void fixPartitionIdToSocketAddrMap();
-
-    /**
-     * Sends a message to destination vertex.
-     *
-     * @param id
-     * @param msg
-     */
-    void sendMessageReq(I id, M msg);
-
-    /**
-     * Sends a partition to the appropriate partition owner
-     *
-     * @param workerInfo Owner the vertices belong to
-     * @param partition Partition to send
-     */
-    void sendPartitionReq(WorkerInfo workerInfo,
-                          Partition<I, V, E, M> partition);
-
-    /**
-     * Sends a request to the appropriate vertex range owner to add an edge
-     *
-     * @param vertexIndex Index of the vertex to get the request
-     * @param edge Edge to be added
-     * @throws IOException
-     */
-    void addEdgeReq(I vertexIndex, Edge<I, E> edge) throws IOException;
-
-    /**
-     * Sends a request to the appropriate vertex range owner to remove an edge
-     *
-     * @param vertexIndex Index of the vertex to get the request
-     * @param destinationVertexIndex Index of the edge to be removed
-     * @throws IOException
-     */
-    void removeEdgeReq(I vertexIndex, I destinationVertexIndex)
-        throws IOException;
-
-    /**
-     * Sends a request to the appropriate vertex range owner to add a vertex
-     *
-     * @param vertex Vertex to be added
-     * @throws IOException
-     */
-    void addVertexReq(BasicVertex<I, V, E, M> vertex) throws IOException;
-
-    /**
-     * Sends a request to the appropriate vertex range owner to remove a vertex
-     *
-     * @param vertexIndex Index of the vertex to be removed
-     * @throws IOException
-     */
-    void removeVertexReq(I vertexIndex) throws IOException;
-
-    /**
-     * Get the vertices that were sent in the last iteration.  After getting
-     * the map, the user should synchronize with it to insure it
-     * is thread-safe.
-     *
-     * @return map of vertex ranges to vertices
-     */
-    Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
+    V extends Writable, E extends Writable, M extends Writable> {
+  /**
+   * Fix changes to the workers and the mapping between partitions and
+   * workers.
+   */
+  void fixPartitionIdToSocketAddrMap();
+
+  /**
+   * Sends a message to destination vertex.
+   *
+   * @param destVertexId Destination vertex id.
+   * @param message Message to send.
+   */
+  void sendMessageReq(I destVertexId, M message);
+
+  /**
+   * Sends a partition to the appropriate partition owner
+   *
+   * @param workerInfo Owner the vertices belong to
+   * @param partition Partition to send
+   */
+  void sendPartitionReq(WorkerInfo workerInfo,
+      Partition<I, V, E, M> partition);
+
+  /**
+   * Sends a request to the appropriate vertex range owner to add an edge
+   *
+   * @param vertexIndex Index of the vertex to get the request
+   * @param edge Edge to be added
+   * @throws IOException
+   */
+  void addEdgeReq(I vertexIndex, Edge<I, E> edge) throws IOException;
+
+  /**
+   * Sends a request to the appropriate vertex range owner to remove an edge
+   *
+   * @param vertexIndex Index of the vertex to get the request
+   * @param destinationVertexIndex Index of the edge to be removed
+   * @throws IOException
+   */
+  void removeEdgeReq(I vertexIndex, I destinationVertexIndex)
+    throws IOException;
+
+  /**
+   * Sends a request to the appropriate vertex range owner to add a vertex
+   *
+   * @param vertex Vertex to be added
+   * @throws IOException
+   */
+  void addVertexReq(BasicVertex<I, V, E, M> vertex) throws IOException;
+
+  /**
+   * Sends a request to the appropriate vertex range owner to remove a vertex
+   *
+   * @param vertexIndex Index of the vertex to be removed
+   * @throws IOException
+   */
+  void removeVertexReq(I vertexIndex) throws IOException;
+
+  /**
+   * Get the vertices that were sent in the last iteration.  After getting
+   * the map, the user should synchronize with it to insure it
+   * is thread-safe.
+   *
+   * @return map of vertex ranges to vertices
+   */
+  Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
 }

Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.giraph.bsp;
-
 /**
- *  State of the BSP application
+ * Package of communication related objects, RPC service.
  */
-public enum ApplicationState {
-    UNKNOWN, ///< Shouldn't be seen, just an initial state
-    START_SUPERSTEP, ///< Start from a desired superstep
-    FAILED, ///< Unrecoverable
-    FINISHED ///< Successful completion
-}
+package org.apache.giraph.comm;

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.giraph.examples;
 
@@ -25,72 +25,72 @@ import java.io.IOException;
 import java.util.Iterator;
 
 /**
- * Implementation of the HCC algorithm that identifies connected components and assigns each
- * vertex its "component identifier" (the smallest vertex id in the component)
+ * Implementation of the HCC algorithm that identifies connected components and
+ * assigns each vertex its "component identifier" (the smallest vertex id
+ * in the component)
  *
- * The idea behind the algorithm is very simple: propagate the smallest vertex id along the
- * edges to all vertices of a connected component. The number of supersteps necessary is
- * equal to the length of the maximum diameter of all components + 1
+ * The idea behind the algorithm is very simple: propagate the smallest
+ * vertex id along the edges to all vertices of a connected component. The
+ * number of supersteps necessary is equal to the length of the maximum
+ * diameter of all components + 1
  *
- * The original Hadoop-based variant of this algorithm was proposed by Kang, Charalampos
- * Tsourakakis and Faloutsos in "PEGASUS: Mining Peta-Scale Graphs", 2010
+ * The original Hadoop-based variant of this algorithm was proposed by Kang,
+ * Charalampos, Tsourakakis and Faloutsos in
+ * "PEGASUS: Mining Peta-Scale Graphs", 2010
  *
  * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
  */
 public class ConnectedComponentsVertex extends IntIntNullIntVertex {
-
-    /**
-     * Propagates the smallest vertex id to all neighbors. Will always choose to halt and only
-     * reactivate if a smaller id has been sent to it.
-     *
-     * @param messages
-     * @throws IOException
-     */
-    @Override
-    public void compute(Iterator<IntWritable> messages) throws IOException {
-
-        int currentComponent = getVertexValue().get();
-
-        // first superstep is special, because we can simply look at the neighbors
-        if (getSuperstep() == 0) {
-            for (Iterator<IntWritable> edges = iterator(); edges.hasNext();) {
-                int neighbor = edges.next().get();
-                if (neighbor < currentComponent) {
-                    currentComponent = neighbor;
-                }
-            }
-            // only need to send value if it is not the own id
-            if (currentComponent != getVertexValue().get()) {
-                setVertexValue(new IntWritable(currentComponent));
-                for (Iterator<IntWritable> edges = iterator();
-                        edges.hasNext();) {
-                    int neighbor = edges.next().get();
-                    if (neighbor > currentComponent) {
-                        sendMsg(new IntWritable(neighbor), getVertexValue());
-                    }
-                }
-            }
-
-            voteToHalt();
-            return;
+  /**
+   * Propagates the smallest vertex id to all neighbors. Will always choose to
+   * halt and only reactivate if a smaller id has been sent to it.
+   *
+   * @param messages Iterator of messages from the previous superstep.
+   * @throws IOException
+   */
+  @Override
+  public void compute(Iterator<IntWritable> messages) throws IOException {
+    int currentComponent = getVertexValue().get();
+
+    // First superstep is special, because we can simply look at the neighbors
+    if (getSuperstep() == 0) {
+      for (Iterator<IntWritable> edges = iterator(); edges.hasNext();) {
+        int neighbor = edges.next().get();
+        if (neighbor < currentComponent) {
+          currentComponent = neighbor;
         }
-
-        boolean changed = false;
-        // did we get a smaller id ?
-        while (messages.hasNext()) {
-            int candidateComponent = messages.next().get();
-            if (candidateComponent < currentComponent) {
-                currentComponent = candidateComponent;
-                changed = true;
-            }
+      }
+      // Only need to send value if it is not the own id
+      if (currentComponent != getVertexValue().get()) {
+        setVertexValue(new IntWritable(currentComponent));
+        for (Iterator<IntWritable> edges = iterator();
+            edges.hasNext();) {
+          int neighbor = edges.next().get();
+          if (neighbor > currentComponent) {
+            sendMsg(new IntWritable(neighbor), getVertexValue());
+          }
         }
+      }
 
-        // propagate new component id to the neighbors
-        if (changed) {
-            setVertexValue(new IntWritable(currentComponent));
-            sendMsgToAllEdges(getVertexValue());
-        }
-        voteToHalt();
+      voteToHalt();
+      return;
     }
 
+    boolean changed = false;
+    // did we get a smaller id ?
+    while (messages.hasNext()) {
+      int candidateComponent = messages.next().get();
+      if (candidateComponent < currentComponent) {
+        currentComponent = candidateComponent;
+        changed = true;
+      }
+    }
+
+    // propagate new component id to the neighbors
+    if (changed) {
+      setVertexValue(new IntWritable(currentComponent));
+      sendMsgToAllEdges(getVertexValue());
+    }
+    voteToHalt();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -32,22 +32,25 @@ import java.util.List;
 /**
  * This VertexInputFormat is meant for testing/debugging.  It simply generates
  * some vertex data that can be consumed by test applications.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class GeneratedVertexInputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable,
-        M extends Writable>
-        extends VertexInputFormat<I, V, E, M> {
-
-    @Override
-    public List<InputSplit> getSplits(JobContext context, int numWorkers)
-        throws IOException, InterruptedException {
-         // This is meaningless, the VertexReader will generate all the test
-         // data.
-        List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
-        for (int i = 0; i < numWorkers; ++i) {
-            inputSplitList.add(new BspInputSplit(i, numWorkers));
-        }
-        return inputSplitList;
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> extends VertexInputFormat<I, V, E, M> {
+  @Override
+  public List<InputSplit> getSplits(JobContext context, int numWorkers)
+    throws IOException, InterruptedException {
+    // This is meaningless, the VertexReader will generate all the test
+    // data.
+    List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+    for (int i = 0; i < numWorkers; ++i) {
+      inputSplitList.add(new BspInputSplit(i, numWorkers));
     }
+    return inputSplitList;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Thu Feb 16 22:12:31 2012
@@ -34,53 +34,58 @@ import java.io.IOException;
  * @param <I> Vertex index value
  * @param <V> Vertex value
  * @param <E> Edge value
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
 public abstract class GeneratedVertexReader<
-        I extends WritableComparable, V extends Writable, E extends Writable,
-        M extends Writable>
-        implements VertexReader<I, V, E, M> {
-    /** Records read so far */
-    protected long recordsRead = 0;
-    /** Total records to read (on this split alone) */
-    protected long totalRecords = 0;
-    /** The input split from initialize(). */
-    protected BspInputSplit inputSplit = null;
-    /** Reverse the id order? */
-    protected boolean reverseIdOrder;
-
-    protected Configuration configuration = null;
-
-    public static final String READER_VERTICES =
-        "GeneratedVertexReader.reader_vertices";
-    public static final long DEFAULT_READER_VERTICES = 10;
-    public static final String REVERSE_ID_ORDER =
-        "GeneratedVertexReader.reverseIdOrder";
-    public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
-
-    public GeneratedVertexReader() {
-    }
-
-    @Override
-    final public void initialize(InputSplit inputSplit,
-                                 TaskAttemptContext context)
-            throws IOException {
-        configuration = context.getConfiguration();
-        totalRecords = configuration.getLong(
-            GeneratedVertexReader.READER_VERTICES,
-            GeneratedVertexReader.DEFAULT_READER_VERTICES);
-        reverseIdOrder = configuration.getBoolean(
-            GeneratedVertexReader.REVERSE_ID_ORDER,
-            GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
-        this.inputSplit = (BspInputSplit) inputSplit;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    final public float getProgress() throws IOException {
-        return recordsRead * 100.0f / totalRecords;
-    }
+    I extends WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable> implements VertexReader<I, V, E, M> {
+  /** Vertices produced by this reader */
+  public static final String READER_VERTICES =
+    "GeneratedVertexReader.reader_vertices";
+  /** Default vertices produced by this reader */
+  public static final long DEFAULT_READER_VERTICES = 10;
+  /** Reverse the order of the vertices? */
+  public static final String REVERSE_ID_ORDER =
+    "GeneratedVertexReader.reverseIdOrder";
+  /** Default ordering is not reversed */
+  public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+  /** Records read so far */
+  protected long recordsRead = 0;
+  /** Total records to read (on this split alone) */
+  protected long totalRecords = 0;
+  /** The input split from initialize(). */
+  protected BspInputSplit inputSplit = null;
+  /** Reverse the id order? */
+  protected boolean reverseIdOrder;
+  /** Saved configuration */
+  protected Configuration configuration = null;
+
+  /**
+   * Default constructor for reflection.
+   */
+  public GeneratedVertexReader() {
+  }
+
+  @Override
+  public final void initialize(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException {
+    configuration = context.getConfiguration();
+    totalRecords = configuration.getLong(
+        GeneratedVertexReader.READER_VERTICES,
+        GeneratedVertexReader.DEFAULT_READER_VERTICES);
+    reverseIdOrder = configuration.getBoolean(
+        GeneratedVertexReader.REVERSE_ID_ORDER,
+        GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+    this.inputSplit = (BspInputSplit) inputSplit;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public final float getProgress() throws IOException {
+    return recordsRead * 100.0f / totalRecords;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.giraph.examples;
 
@@ -37,61 +37,68 @@ import java.util.Map;
 import java.util.regex.Pattern;
 
 /**
- * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for unweighted
- * graphs with int ids.
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with int ids.
  *
  * Each line consists of: vertex neighbor1 neighbor2 ...
  */
 public class IntIntNullIntTextInputFormat extends
-        TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
-        IntWritable> {
+    TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+    IntWritable> {
 
-    @Override
-    public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
-    createVertexReader(InputSplit split, TaskAttemptContext context)
-            throws IOException {
-        return new IntIntNullIntVertexReader(
-                textInputFormat.createRecordReader(split, context));
+  @Override
+  public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
+  createVertexReader(InputSplit split, TaskAttemptContext context)
+    throws IOException {
+    return new IntIntNullIntVertexReader(
+        textInputFormat.createRecordReader(split, context));
+  }
+
+  /**
+   * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
+   */
+  public static class IntIntNullIntVertexReader extends
+      TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
+      NullWritable, IntWritable> {
+    /** Separator of the vertex and neighbors */
+    private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+    /**
+     * Constructor with the line reader.
+     *
+     * @param lineReader Internal line reader.
+     */
+    public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
+    lineReader) {
+      super(lineReader);
     }
 
-    public static class IntIntNullIntVertexReader extends
-            TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
-                    NullWritable, IntWritable> {
-
-        private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
-        public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
-                lineReader) {
-            super(lineReader);
-        }
-
-        @Override
-        public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
-                getCurrentVertex() throws IOException, InterruptedException {
-            BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
-                    vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
-                    IntWritable>createVertex(getContext().getConfiguration());
-
-            String[] tokens = SEPARATOR.split(getRecordReader()
-                    .getCurrentValue().toString());
-            Map<IntWritable, NullWritable> edges =
-                    Maps.newHashMapWithExpectedSize(tokens.length - 1);
-            for (int n = 1; n < tokens.length; n++) {
-                edges.put(new IntWritable(Integer.parseInt(tokens[n])),
-                        NullWritable.get());
-            }
-
-            IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
-            vertex.initialize(vertexId, vertexId, edges,
-                    Lists.<IntWritable>newArrayList());
-
-            return vertex;
-        }
-
-        @Override
-        public boolean nextVertex() throws IOException, InterruptedException {
-            return getRecordReader().nextKeyValue();
-        }
+    @Override
+    public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+    getCurrentVertex() throws IOException, InterruptedException {
+      BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+      vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
+      IntWritable>createVertex(getContext().getConfiguration());
+
+      String[] tokens = SEPARATOR.split(getRecordReader()
+          .getCurrentValue().toString());
+      Map<IntWritable, NullWritable> edges =
+          Maps.newHashMapWithExpectedSize(tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.put(new IntWritable(Integer.parseInt(tokens[n])),
+            NullWritable.get());
+      }
+
+      IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
+      vertex.initialize(vertexId, vertexId, edges,
+          Lists.<IntWritable>newArrayList());
+
+      return vertex;
     }
 
+    @Override
+    public boolean nextVertex() throws IOException, InterruptedException {
+      return getRecordReader().nextKeyValue();
+    }
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java Thu Feb 16 22:12:31 2012
@@ -26,30 +26,35 @@ import org.apache.giraph.graph.Aggregato
  * Aggregator for summing up values.
  */
 public class LongSumAggregator implements Aggregator<LongWritable> {
-    /** Internal sum */
-    private long sum = 0;
+  /** Internal sum */
+  private long sum = 0;
 
-    public void aggregate(long value) {
-        sum += value;
-    }
+  /**
+   * Aggregate with a primitive long.
+   *
+   * @param value Long value to aggregate.
+   */
+  public void aggregate(long value) {
+    sum += value;
+  }
 
-    @Override
-    public void aggregate(LongWritable value) {
-        sum += value.get();
-    }
+  @Override
+  public void aggregate(LongWritable value) {
+    sum += value.get();
+  }
 
-    @Override
-    public void setAggregatedValue(LongWritable value) {
-        sum = value.get();
-    }
+  @Override
+  public void setAggregatedValue(LongWritable value) {
+    sum = value.get();
+  }
 
-    @Override
-    public LongWritable getAggregatedValue() {
-        return new LongWritable(sum);
-    }
+  @Override
+  public LongWritable getAggregatedValue() {
+    return new LongWritable(sum);
+  }
 
-    @Override
-    public LongWritable createAggregatedValue() {
-        return new LongWritable();
-    }
+  @Override
+  public LongWritable createAggregatedValue() {
+    return new LongWritable();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java Thu Feb 16 22:12:31 2012
@@ -25,29 +25,31 @@ import org.apache.giraph.graph.Aggregato
 /**
  * Aggregator for getting max value.
  *
- **/
-
+ */
 public class MaxAggregator implements Aggregator<DoubleWritable> {
-
+  /** Saved maximum value */
   private double max = Double.MIN_VALUE;
 
+  @Override
   public void aggregate(DoubleWritable value) {
-      double val = value.get();
-      if (val > max) {
-          max = val;
-      }
+    double val = value.get();
+    if (val > max) {
+      max = val;
+    }
   }
 
+  @Override
   public void setAggregatedValue(DoubleWritable value) {
-      max = value.get();
+    max = value.get();
   }
 
+  @Override
   public DoubleWritable getAggregatedValue() {
-      return new DoubleWritable(max);
+    return new DoubleWritable(max);
   }
 
+  @Override
   public DoubleWritable createAggregatedValue() {
-      return new DoubleWritable();
+    return new DoubleWritable();
   }
-
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java Thu Feb 16 22:12:31 2012
@@ -24,30 +24,32 @@ import org.apache.giraph.graph.Aggregato
 
 /**
  * Aggregator for getting min value.
- *
- **/
-
+ */
 public class MinAggregator implements Aggregator<DoubleWritable> {
-
+  /** Internal aggregator */
   private double min = Double.MAX_VALUE;
 
+  @Override
   public void aggregate(DoubleWritable value) {
-      double val = value.get();
-      if (val < min) {
-          min = val;
-      }
+    double val = value.get();
+    if (val < min) {
+      min = val;
+    }
   }
 
+  @Override
   public void setAggregatedValue(DoubleWritable value) {
-      min = value.get();
+    min = value.get();
   }
 
+  @Override
   public DoubleWritable getAggregatedValue() {
-      return new DoubleWritable(min);
+    return new DoubleWritable(min);
   }
 
+  @Override
   public DoubleWritable createAggregatedValue() {
-      return new DoubleWritable();
+    return new DoubleWritable();
   }
 
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.giraph.examples;
 
@@ -29,20 +29,19 @@ import java.util.List;
  * {@link VertexCombiner} that finds the minimum {@link IntWritable}
  */
 public class MinimumIntCombiner
-        extends VertexCombiner<IntWritable, IntWritable> {
-
-    @Override
-    public Iterable<IntWritable> combine(IntWritable target,
-    		Iterable<IntWritable> messages) throws IOException {
-        int minimum = Integer.MAX_VALUE;
-        for (IntWritable message : messages) {
-            if (message.get() < minimum) {
-                minimum = message.get();
-            }
-        }
-        List<IntWritable> value = new ArrayList<IntWritable>();
-        value.add(new IntWritable(minimum));
-        
-        return value;
+    extends VertexCombiner<IntWritable, IntWritable> {
+  @Override
+  public Iterable<IntWritable> combine(IntWritable target,
+      Iterable<IntWritable> messages) throws IOException {
+    int minimum = Integer.MAX_VALUE;
+    for (IntWritable message : messages) {
+      if (message.get() < minimum) {
+        minimum = message.get();
+      }
     }
+    List<IntWritable> value = new ArrayList<IntWritable>();
+    value.add(new IntWritable(minimum));
+
+    return value;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java Thu Feb 16 22:12:31 2012
@@ -37,32 +37,37 @@ import org.apache.hadoop.mapreduce.Mappe
  * directory.
  */
 public class SimpleAggregatorWriter implements AggregatorWriter {
-    /** the name of the file we wrote to */
-    public static String filename;
-    private FSDataOutputStream output;
-    
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void initialize(Context context, long applicationAttempt)
-            throws IOException {
-        filename = "aggregatedValues_"+applicationAttempt;
-        Path p = new Path(filename);
-        FileSystem fs = FileSystem.get(context.getConfiguration());
-        output = fs.create(p, true);
-    }
+  /** Name of the file we wrote to */
+  private static String FILENAME;
+  /** Saved output stream to write to */
+  private FSDataOutputStream output;
 
-    @Override
-    public void writeAggregator(Map<String, Aggregator<Writable>> map,
-            long superstep) throws IOException {
+  public static String getFilename() {
+    return FILENAME;
+  }
 
-        for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
-            aggregator.getValue().getAggregatedValue().write(output);
-        }
-        output.flush();
-    }
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void initialize(Context context, long applicationAttempt)
+    throws IOException {
+    FILENAME = "aggregatedValues_" + applicationAttempt;
+    Path p = new Path(FILENAME);
+    FileSystem fs = FileSystem.get(context.getConfiguration());
+    output = fs.create(p, true);
+  }
 
-    @Override
-    public void close() throws IOException {
-        output.close();
+  @Override
+  public void writeAggregator(Map<String, Aggregator<Writable>> map,
+      long superstep) throws IOException {
+
+    for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
+      aggregator.getValue().getAggregatedValue().write(output);
     }
+    output.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    output.close();
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Thu Feb 16 22:12:31 2012
@@ -18,8 +18,14 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.commons.cli.*;
-import org.apache.giraph.graph.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.FloatWritable;
@@ -38,208 +44,222 @@ import java.util.Iterator;
  * can also test automated checkpoint restarts.
  */
 public class SimpleCheckpointVertex extends
-        EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
-        implements Tool {
-    private static Logger LOG =
-        Logger.getLogger(SimpleCheckpointVertex.class);
-    /** Configuration */
-    private Configuration conf;
-    /** Which superstep to cause the worker to fail */
-    public final int faultingSuperstep = 4;
-    /** Vertex id to fault on */
-    public final long faultingVertexId = 1;
-    /** Dynamically set number of supersteps */
-    public static final String SUPERSTEP_COUNT =
-        "simpleCheckpointVertex.superstepCount";
-    /** Should fault? */
-    public static final String ENABLE_FAULT=
-        "simpleCheckpointVertex.enableFault";
+    EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
+    implements Tool {
+  /** Which superstep to cause the worker to fail */
+  public static final int FAULTING_SUPERSTEP = 4;
+  /** Vertex id to fault on */
+  public static final long FAULTING_VERTEX_ID = 1;
+  /** Dynamically set number of supersteps */
+  public static final String SUPERSTEP_COUNT =
+      "simpleCheckpointVertex.superstepCount";
+  /** Should fault? */
+  public static final String ENABLE_FAULT =
+      "simpleCheckpointVertex.enableFault";
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(SimpleCheckpointVertex.class);
+  /** Configuration */
+  private Configuration conf;
+
+  @Override
+  public void compute(Iterator<FloatWritable> msgIterator) {
+    SimpleCheckpointVertexWorkerContext workerContext =
+        (SimpleCheckpointVertexWorkerContext) getWorkerContext();
+
+    LongSumAggregator sumAggregator = (LongSumAggregator)
+        getAggregator(LongSumAggregator.class.getName());
+
+    boolean enableFault = workerContext.getEnableFault();
+    int supersteps = workerContext.getSupersteps();
+
+    if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
+        (getContext().getTaskAttemptID().getId() == 0) &&
+        (getVertexId().get() == FAULTING_VERTEX_ID)) {
+      LOG.info("compute: Forced a fault on the first " +
+          "attempt of superstep " +
+          FAULTING_SUPERSTEP + " and vertex id " +
+          FAULTING_VERTEX_ID);
+      System.exit(-1);
+    }
+    if (getSuperstep() > supersteps) {
+      voteToHalt();
+      return;
+    }
+    LOG.info("compute: " + sumAggregator);
+    sumAggregator.aggregate(getVertexId().get());
+    LOG.info("compute: sum = " +
+        sumAggregator.getAggregatedValue().get() +
+        " for vertex " + getVertexId());
+    float msgValue = 0.0f;
+    while (msgIterator.hasNext()) {
+      float curMsgValue = msgIterator.next().get();
+      msgValue += curMsgValue;
+      LOG.info("compute: got msgValue = " + curMsgValue +
+          " for vertex " + getVertexId() +
+          " on superstep " + getSuperstep());
+    }
+    int vertexValue = getVertexValue().get();
+    setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+    LOG.info("compute: vertex " + getVertexId() +
+        " has value " + getVertexValue() +
+        " on superstep " + getSuperstep());
+    for (LongWritable targetVertexId : this) {
+      FloatWritable edgeValue = getEdgeValue(targetVertexId);
+      LOG.info("compute: vertex " + getVertexId() +
+          " sending edgeValue " + edgeValue +
+          " vertexValue " + vertexValue +
+          " total " + (edgeValue.get() +
+              (float) vertexValue) +
+              " to vertex " + targetVertexId +
+              " on superstep " + getSuperstep());
+      edgeValue.set(edgeValue.get() + (float) vertexValue);
+      addEdge(targetVertexId, edgeValue);
+      sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
+    }
+  }
+
+  /**
+   * Worker context associated with {@link SimpleCheckpointVertex}.
+   */
+  public static class SimpleCheckpointVertexWorkerContext
+      extends WorkerContext {
+    /** Filename to indicate whether a fault was found */
+    public static final String FAULT_FILE = "/tmp/faultFile";
+    /** User can access this after the application finishes if local */
+    private static long FINAL_SUM;
+    /** Number of supersteps to run (6 by default) */
+    private int supersteps = 6;
+    /** Enable the fault at the particular vertex id and superstep? */
+    private boolean enableFault = false;
 
-    @Override
-    public void compute(Iterator<FloatWritable> msgIterator) {
-    	SimpleCheckpointVertexWorkerContext workerContext =
-    		(SimpleCheckpointVertexWorkerContext) getWorkerContext();
-
-        LongSumAggregator sumAggregator = (LongSumAggregator)
-            getAggregator(LongSumAggregator.class.getName());
-
-        boolean enableFault = workerContext.getEnableFault();
-        int supersteps = workerContext.getSupersteps();
-
-        if (enableFault && (getSuperstep() == faultingSuperstep) &&
-                (getContext().getTaskAttemptID().getId() == 0) &&
-                (getVertexId().get() == faultingVertexId)) {
-            System.out.println("compute: Forced a fault on the first " +
-                               "attempt of superstep " +
-                               faultingSuperstep + " and vertex id " +
-                               faultingVertexId);
-            System.exit(-1);
-        }
-        if (getSuperstep() > supersteps) {
-            voteToHalt();
-            return;
-        }
-        System.out.println("compute: " + sumAggregator);
-        sumAggregator.aggregate(getVertexId().get());
-        System.out.println("compute: sum = " +
-                           sumAggregator.getAggregatedValue().get() +
-                           " for vertex " + getVertexId());
-        float msgValue = 0.0f;
-        while (msgIterator.hasNext()) {
-            float curMsgValue = msgIterator.next().get();
-            msgValue += curMsgValue;
-            System.out.println("compute: got msgValue = " + curMsgValue +
-                               " for vertex " + getVertexId() +
-                               " on superstep " + getSuperstep());
-        }
-        int vertexValue = getVertexValue().get();
-        setVertexValue(new IntWritable(vertexValue + (int) msgValue));
-        System.out.println("compute: vertex " + getVertexId() +
-                           " has value " + getVertexValue() +
-                           " on superstep " + getSuperstep());
-        for (LongWritable targetVertexId : this) {
-            FloatWritable edgeValue = getEdgeValue(targetVertexId);
-            System.out.println("compute: vertex " + getVertexId() +
-                               " sending edgeValue " + edgeValue +
-                               " vertexValue " + vertexValue +
-                               " total " + (edgeValue.get() +
-                               (float) vertexValue) +
-                               " to vertex " + targetVertexId +
-                               " on superstep " + getSuperstep());
-            edgeValue.set(edgeValue.get() + (float) vertexValue);
-            addEdge(targetVertexId, edgeValue);
-            sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
-        }
-    }
-
-    public static class SimpleCheckpointVertexWorkerContext
-            extends WorkerContext {
-        /** User can access this after the application finishes if local */
-        public static long finalSum;
-        /** Number of supersteps to run (6 by default) */
-        private int supersteps = 6;
-        /** Filename to indicate whether a fault was found */
-        public final String faultFile = "/tmp/faultFile";
-        /** Enable the fault at the particular vertex id and superstep? */
-        private boolean enableFault = false;
-
-		@Override
-		public void preApplication()
-		        throws InstantiationException, IllegalAccessException {
-		    registerAggregator(LongSumAggregator.class.getName(),
-					LongSumAggregator.class);
-		    LongSumAggregator sumAggregator = (LongSumAggregator)
-		    getAggregator(LongSumAggregator.class.getName());
-		    sumAggregator.setAggregatedValue(new LongWritable(0));
-		    supersteps = getContext().getConfiguration()
-		        .getInt(SUPERSTEP_COUNT, supersteps);
-		    enableFault = getContext().getConfiguration()
-		        .getBoolean(ENABLE_FAULT, false);
-		}
-
-		@Override
-		public void postApplication() {
-		    LongSumAggregator sumAggregator = (LongSumAggregator)
-		        getAggregator(LongSumAggregator.class.getName());
-		    finalSum = sumAggregator.getAggregatedValue().get();
-		    LOG.info("finalSum="+ finalSum);
-		}
-
-		@Override
-		public void preSuperstep() {
-	        useAggregator(LongSumAggregator.class.getName());
-		}
-
-		@Override
-		public void postSuperstep() { }
-
-		public int getSupersteps() {
-		    return this.supersteps;
-		}
-
-		public boolean getEnableFault() {
-		    return this.enableFault;
-		}
+    public static long getFinalSum() {
+      return FINAL_SUM;
     }
 
     @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-        options.addOption("h", "help", false, "Help");
-        options.addOption("v", "verbose", false, "Verbose");
-        options.addOption("w",
-                          "workers",
-                          true,
-                          "Number of workers");
-        options.addOption("s",
-                          "supersteps",
-                          true,
-                          "Supersteps to execute before finishing");
-        options.addOption("w",
-                          "workers",
-                          true,
-                          "Minimum number of workers");
-        options.addOption("o",
-                          "outputDirectory",
-                          true,
-                          "Output directory");
-        HelpFormatter formatter = new HelpFormatter();
-        if (args.length == 0) {
-            formatter.printHelp(getClass().getName(), options, true);
-            return 0;
-        }
-        CommandLineParser parser = new PosixParser();
-        CommandLine cmd = parser.parse(options, args);
-        if (cmd.hasOption('h')) {
-            formatter.printHelp(getClass().getName(), options, true);
-            return 0;
-        }
-        if (!cmd.hasOption('w')) {
-            System.out.println("Need to choose the number of workers (-w)");
-            return -1;
-        }
-        if (!cmd.hasOption('o')) {
-            System.out.println("Need to set the output directory (-o)");
-            return -1;
-        }
-
-        GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
-        bspJob.setVertexClass(getClass());
-        bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
-        bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
-        bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
-        int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
-        int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
-        bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
-
-        FileOutputFormat.setOutputPath(bspJob,
-                                       new Path(cmd.getOptionValue('o')));
-        boolean verbose = false;
-        if (cmd.hasOption('v')) {
-            verbose = true;
-        }
-        if (cmd.hasOption('s')) {
-            getConf().setInt(SUPERSTEP_COUNT,
-                             Integer.parseInt(cmd.getOptionValue('s')));
-        }
-        if (bspJob.run(verbose) == true) {
-            return 0;
-        } else {
-            return -1;
-        }
+    public void preApplication()
+      throws InstantiationException, IllegalAccessException {
+      registerAggregator(LongSumAggregator.class.getName(),
+          LongSumAggregator.class);
+      LongSumAggregator sumAggregator = (LongSumAggregator)
+          getAggregator(LongSumAggregator.class.getName());
+      sumAggregator.setAggregatedValue(new LongWritable(0));
+      supersteps = getContext().getConfiguration()
+          .getInt(SUPERSTEP_COUNT, supersteps);
+      enableFault = getContext().getConfiguration()
+          .getBoolean(ENABLE_FAULT, false);
     }
 
-    public static void main(String[] args) throws Exception {
-        System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
+    @Override
+    public void postApplication() {
+      LongSumAggregator sumAggregator = (LongSumAggregator)
+          getAggregator(LongSumAggregator.class.getName());
+      FINAL_SUM = sumAggregator.getAggregatedValue().get();
+      LOG.info("FINAL_SUM=" + FINAL_SUM);
     }
 
     @Override
-    public Configuration getConf() {
-        return conf;
+    public void preSuperstep() {
+      useAggregator(LongSumAggregator.class.getName());
     }
 
     @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
+    public void postSuperstep() { }
+
+    public int getSupersteps() {
+      return this.supersteps;
+    }
+
+    public boolean getEnableFault() {
+      return this.enableFault;
     }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Options options = new Options();
+    options.addOption("h", "help", false, "Help");
+    options.addOption("v", "verbose", false, "Verbose");
+    options.addOption("w",
+        "workers",
+        true,
+        "Number of workers");
+    options.addOption("s",
+        "supersteps",
+        true,
+        "Supersteps to execute before finishing");
+    options.addOption("w",
+        "workers",
+        true,
+        "Minimum number of workers");
+    options.addOption("o",
+        "outputDirectory",
+        true,
+        "Output directory");
+    HelpFormatter formatter = new HelpFormatter();
+    if (args.length == 0) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    CommandLineParser parser = new PosixParser();
+    CommandLine cmd = parser.parse(options, args);
+    if (cmd.hasOption('h')) {
+      formatter.printHelp(getClass().getName(), options, true);
+      return 0;
+    }
+    if (!cmd.hasOption('w')) {
+      LOG.info("Need to choose the number of workers (-w)");
+      return -1;
+    }
+    if (!cmd.hasOption('o')) {
+      LOG.info("Need to set the output directory (-o)");
+      return -1;
+    }
+
+    GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
+    bspJob.setVertexClass(getClass());
+    bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
+    bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
+    bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
+    int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+    int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+    bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
+
+    FileOutputFormat.setOutputPath(bspJob,
+        new Path(cmd.getOptionValue('o')));
+    boolean verbose = false;
+    if (cmd.hasOption('v')) {
+      verbose = true;
+    }
+    if (cmd.hasOption('s')) {
+      getConf().setInt(SUPERSTEP_COUNT,
+          Integer.parseInt(cmd.getOptionValue('s')));
+    }
+    if (bspJob.run(verbose)) {
+      return 0;
+    } else {
+      return -1;
+    }
+  }
+
+  /**
+   * Executable from the command line.
+   *
+   * @param args Command line args.
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
 }

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java Thu Feb 16 22:12:31 2012
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
 
 import org.apache.giraph.graph.EdgeListVertex;
 
@@ -30,36 +31,38 @@ import org.apache.giraph.graph.EdgeListV
  * Test whether messages can go through a combiner.
  */
 public class SimpleCombinerVertex extends
-        EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
-    @Override
-    public void compute(Iterator<IntWritable> msgIterator) {
-        if (getVertexId().equals(new LongWritable(2))) {
-            sendMsg(new LongWritable(1), new IntWritable(101));
-            sendMsg(new LongWritable(1), new IntWritable(102));
-            sendMsg(new LongWritable(1), new IntWritable(103));
-        }
-        if (!getVertexId().equals(new LongWritable(1))) {
-            voteToHalt();
-        }
-        else {
-            // Check the messages
-            int sum = 0;
-            int num = 0;
-            while (msgIterator != null && msgIterator.hasNext()) {
-                sum += msgIterator.next().get();
-                num++;
-            }
-            System.out.println("TestCombinerVertex: Received a sum of " + sum +
-            " (should have 306 with a single message value)");
+    EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class);
 
-            if (num == 1 && sum == 306) {
-                voteToHalt();
-            }
-        }
-        if (getSuperstep() > 3) {
-            throw new IllegalStateException(
-                "TestCombinerVertex: Vertex 1 failed to receive " +
-                "messages in time");
-        }
+  @Override
+  public void compute(Iterator<IntWritable> msgIterator) {
+    if (getVertexId().equals(new LongWritable(2))) {
+      sendMsg(new LongWritable(1), new IntWritable(101));
+      sendMsg(new LongWritable(1), new IntWritable(102));
+      sendMsg(new LongWritable(1), new IntWritable(103));
     }
+    if (!getVertexId().equals(new LongWritable(1))) {
+      voteToHalt();
+    } else {
+      // Check the messages
+      int sum = 0;
+      int num = 0;
+      while (msgIterator != null && msgIterator.hasNext()) {
+        sum += msgIterator.next().get();
+        num++;
+      }
+      LOG.info("TestCombinerVertex: Received a sum of " + sum +
+          " (should have 306 with a single message value)");
+
+      if (num == 1 && sum == 306) {
+        voteToHalt();
+      }
+    }
+    if (getSuperstep() > 3) {
+      throw new IllegalStateException(
+          "TestCombinerVertex: Vertex 1 failed to receive " +
+          "messages in time");
+    }
+  }
 }