You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/02/16 23:12:36 UTC
svn commit: r1245205 [4/18] - in /incubator/giraph/trunk: ./
src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/
src/main/java/org/apache/giraph/examples...
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java Thu Feb 16 22:12:31 2012
@@ -34,100 +34,98 @@ import org.apache.hadoop.ipc.VersionedPr
/**
* Basic interface for communication between workers.
*
- * @param <I extends Writable> vertex id
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
/*if_not[HADOOP]
else[HADOOP]*/
@TokenInfo(BspTokenSelector.class)
/*end[HADOOP]*/
-public interface CommunicationsInterface<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends VersionedProtocol {
-
- /**
- * Interface Version History
- *
- * 0 - First Version
- */
- static final long versionID = 0L;
-
- /**
- * Adds incoming message.
- *
- * @param vertexIndex
- * @param msg
- * @throws IOException
- */
- void putMsg(I vertexIndex, M msg) throws IOException;
-
- /**
- * Adds incoming message list.
- *
- * @param vertexIndex Vertex index where the message are added
- * @param msgList messages added
- * @throws IOException
- */
- void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException;
-
- /**
- * Adds a list of vertex ids and their respective message lists.
- *
- * @param vertexIdMessagesList messages to be added
- * @throws IOException
- */
- void putVertexIdMessagesList(
- VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException;
-
- /**
- * Adds vertex list (index, value, edges, etc.) to the appropriate worker.
- *
- * @param partitionId Partition id of the vertices to be added.
- * @param vertexList List of vertices to add
- */
- void putVertexList(int partitionId,
- VertexList<I, V, E, M> vertexList) throws IOException;
-
- /**
- * Add an edge to a remote vertex
- *
- * @param vertexIndex Vertex index where the edge is added
- * @param edge Edge to be added
- * @throws IOException
- */
- void addEdge(I vertexIndex, Edge<I, E> edge) throws IOException;
-
- /**
- * Remove an edge on a remote vertex
- *
- * @param vertexIndex Vertex index where the edge is added
- * @param destinationVertexIndex Edge vertex index to be removed
- * @throws IOException
- */
- void removeEdge(I vertexIndex, I destinationVertexIndex) throws IOException;
-
- /**
- * Add a remote vertex
- *
- * @param vertex Vertex that will be added
- * @throws IOException
- */
- void addVertex(BasicVertex<I, V, E, M> vertex) throws IOException;
-
- /**
- * Removed a remote vertex
- *
- * @param vertexIndex Vertex index representing vertex to be removed
- * @throws IOException
- */
- void removeVertex(I vertexIndex) throws IOException;
-
- /**
- * @return The name of this worker in the format "hostname:port".
- */
- String getName();
+public interface CommunicationsInterface<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends VersionedProtocol {
+ /**
+ * Interface Version History
+ *
+ * 0 - First Version
+ */
+ long VERSION_ID = 0L;
+
+ /**
+ * Adds incoming message.
+ *
+ * @param vertexIndex Destination vertex index.
+ * @param message Message to store.
+ * @throws IOException
+ */
+ void putMsg(I vertexIndex, M message) throws IOException;
+
+ /**
+ * Adds incoming message list.
+ *
+ * @param vertexIndex Vertex index where the message are added
+ * @param msgList messages added
+ * @throws IOException
+ */
+ void putMsgList(I vertexIndex, MsgList<M> msgList) throws IOException;
+
+ /**
+ * Adds a list of vertex ids and their respective message lists.
+ *
+ * @param vertexIdMessagesList messages to be added
+ * @throws IOException
+ */
+ void putVertexIdMessagesList(
+ VertexIdMessagesList<I, M> vertexIdMessagesList) throws IOException;
+
+ /**
+ * Adds vertex list (index, value, edges, etc.) to the appropriate worker.
+ *
+ * @param partitionId Partition id of the vertices to be added.
+ * @param vertexList List of vertices to add
+ */
+ void putVertexList(int partitionId,
+ VertexList<I, V, E, M> vertexList) throws IOException;
+
+ /**
+ * Add an edge to a remote vertex
+ *
+ * @param vertexIndex Vertex index where the edge is added
+ * @param edge Edge to be added
+ * @throws IOException
+ */
+ void addEdge(I vertexIndex, Edge<I, E> edge) throws IOException;
+
+ /**
+ * Remove an edge on a remote vertex
+ *
+ * @param vertexIndex Vertex index where the edge is added
+ * @param destinationVertexIndex Edge vertex index to be removed
+ * @throws IOException
+ */
+ void removeEdge(I vertexIndex, I destinationVertexIndex) throws IOException;
+
+ /**
+ * Add a remote vertex
+ *
+ * @param vertex Vertex that will be added
+ * @throws IOException
+ */
+ void addVertex(BasicVertex<I, V, E, M> vertex) throws IOException;
+
+ /**
+ * Removed a remote vertex
+ *
+ * @param vertexIndex Vertex index representing vertex to be removed
+ * @throws IOException
+ */
+ void removeVertex(I vertexIndex) throws IOException;
+
+ /**
+ * @return The name of this worker in the format "hostname:port".
+ */
+ String getName();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java Thu Feb 16 22:12:31 2012
@@ -27,22 +27,29 @@ import org.apache.hadoop.io.Writable;
*
* @param <M> message type
*/
-public class MsgList<M extends Writable>
- extends ArrayListWritable<M> {
- /** Defining a layout version for a serializable class. */
- private static final long serialVersionUID = 100L;
+public class MsgList<M extends Writable> extends ArrayListWritable<M> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 100L;
- public MsgList() {
- super();
- }
-
- public MsgList(MsgList<M> msgList) {
- super(msgList);
- }
+ /**
+ * Default constructor.
+ */
+ public MsgList() {
+ super();
+ }
- @SuppressWarnings("unchecked")
- @Override
- public void setClass() {
- setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
- }
+ /**
+ * Copy constructor.
+ *
+ * @param msgList List of messages for writing.
+ */
+ public MsgList(MsgList<M> msgList) {
+ super(msgList);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<M>) BspUtils.getMessageValueClass(getConf()));
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java Thu Feb 16 22:12:31 2012
@@ -21,7 +21,6 @@ package org.apache.giraph.comm;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
/*if_not[HADOOP]
else[HADOOP]*/
@@ -48,119 +47,155 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.mapreduce.Mapper;
+/**
+ * Used to implement abstract {@link BasicRPCCommunications} methods.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
@SuppressWarnings("rawtypes")
-public class RPCCommunications<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
-/*if_not[HADOOP]
-extends BasicRPCCommunications<I, V, E, M, Object> {
-else[HADOOP]*/
- extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
-/*end[HADOOP]*/
-
- /** Class logger */
- public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
+public class RPCCommunications<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ /*if_not[HADOOP]
+ extends BasicRPCCommunications<I, V, E, M, Object> {
+ else[HADOOP]*/
+ extends BasicRPCCommunications<I, V, E, M, Token<JobTokenIdentifier>> {
+ /*end[HADOOP]*/
+
+ /** Class logger */
+ public static final Logger LOG = Logger.getLogger(RPCCommunications.class);
+
+ /**
+ * Constructor.
+ *
+ * @param context Context to be saved.
+ * @param service Server worker.
+ * @param graphState Graph state from infrastructure.
+ * @throws IOException
+ * @throws UnknownHostException
+ * @throws InterruptedException
+ */
+ public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
+ CentralizedServiceWorker<I, V, E, M> service,
+ GraphState<I, V, E, M> graphState) throws
+ IOException, InterruptedException {
+ super(context, service);
+ }
- public RPCCommunications(Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E, M> service,
- GraphState<I, V, E, M> graphState)
- throws IOException, UnknownHostException, InterruptedException {
- super(context, service);
- }
-
-/*if_not[HADOOP]
+ /*if_not[HADOOP]
protected Object createJobToken() throws IOException {
return null;
}
-else[HADOOP]*/
- protected Token<JobTokenIdentifier> createJobToken() throws IOException {
- String localJobTokenFile = System.getenv().get(
- UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
- if (localJobTokenFile != null) {
- Credentials credentials =
- TokenCache.loadTokens(localJobTokenFile, conf);
- return TokenCache.getJobToken(credentials);
- }
- return null;
- }
-/*end[HADOOP]*/
-
- protected Server getRPCServer(
- InetSocketAddress myAddress, int numHandlers, String jobId,
-/*if_not[HADOOP]
+ else[HADOOP]*/
+ /**
+ * Create the job token.
+ *
+ * @return Job token.
+ */
+ protected Token<JobTokenIdentifier> createJobToken() throws IOException {
+ String localJobTokenFile = System.getenv().get(
+ UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if (localJobTokenFile != null) {
+ Credentials credentials =
+ TokenCache.loadTokens(localJobTokenFile, conf);
+ return TokenCache.getJobToken(credentials);
+ }
+ return null;
+ }
+ /*end[HADOOP]*/
+
+ /**
+ * Get the RPC server.
+ *
+ * @param myAddress My address.
+ * @param numHandlers Number of handler threads.
+ * @param jobId Job id.
+ * @param jt Jobtoken indentifier.
+ * @return RPC server.
+ */
+ protected Server getRPCServer(
+ InetSocketAddress myAddress, int numHandlers, String jobId,
+ /*if_not[HADOOP]
Object jt) throws IOException {
return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
numHandlers, false, conf);
}
-else[HADOOP]*/
- Token<JobTokenIdentifier> jt) throws IOException {
- @SuppressWarnings("deprecation")
- String hadoopSecurityAuthorization =
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
- if (conf.getBoolean(
- hadoopSecurityAuthorization,
- false)) {
- ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
- }
- JobTokenSecretManager jobTokenSecretManager =
- new JobTokenSecretManager();
- if (jt != null) { //could be null in the case of some unit tests
- jobTokenSecretManager.addTokenForJob(jobId, jt);
- if (LOG.isInfoEnabled()) {
- LOG.info("getRPCServer: Added jobToken " + jt);
- }
- }
- return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
- numHandlers, false, conf, jobTokenSecretManager);
- }
-/*end[HADOOP]*/
-
- protected CommunicationsInterface<I, V, E, M> getRPCProxy(
- final InetSocketAddress addr,
- String jobId,
-/*if_not[HADOOP]
- Object jt)
-else[HADOOP]*/
- Token<JobTokenIdentifier> jt)
-/*end[HADOOP]*/
- throws IOException, InterruptedException {
- final Configuration config = new Configuration(conf);
-
-/*if_not[HADOOP]
+ else[HADOOP]*/
+ Token<JobTokenIdentifier> jt) throws IOException {
+ @SuppressWarnings("deprecation")
+ String hadoopSecurityAuthorization =
+ ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG;
+ if (conf.getBoolean(
+ hadoopSecurityAuthorization,
+ false)) {
+ ServiceAuthorizationManager.refresh(conf, new BspPolicyProvider());
+ }
+ JobTokenSecretManager jobTokenSecretManager =
+ new JobTokenSecretManager();
+ if (jt != null) { //could be null in the case of some unit tests
+ jobTokenSecretManager.addTokenForJob(jobId, jt);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getRPCServer: Added jobToken " + jt);
+ }
+ }
+ return RPC.getServer(this, myAddress.getHostName(), myAddress.getPort(),
+ numHandlers, false, conf, jobTokenSecretManager);
+ }
+ /*end[HADOOP]*/
+
+ /**
+ * Get the RPC proxy.
+ *
+ * @param addr Address of the RPC server.
+ * @param jobId Job id.
+ * @param jt Job token.
+ * @return Proxy of the RPC server.
+ */
+ protected CommunicationsInterface<I, V, E, M> getRPCProxy(
+ final InetSocketAddress addr,
+ String jobId,
+ /*if_not[HADOOP]
+ Object jt)
+ else[HADOOP]*/
+ Token<JobTokenIdentifier> jt)
+ /*end[HADOOP]*/
+ throws IOException, InterruptedException {
+ final Configuration config = new Configuration(conf);
+ /*if_not[HADOOP]
@SuppressWarnings("unchecked")
CommunicationsInterface<I, V, E, M> proxy =
(CommunicationsInterface<I, V, E, M>)RPC.getProxy(
CommunicationsInterface.class, versionID, addr, config);
return proxy;
-else[HADOOP]*/
- if (jt == null) {
- @SuppressWarnings("unchecked")
- CommunicationsInterface<I, V, E, M> proxy =
- (CommunicationsInterface<I, V, E, M>)RPC.getProxy(
- CommunicationsInterface.class, versionID, addr, config);
- return proxy;
+ else[HADOOP]*/
+ if (jt == null) {
+ @SuppressWarnings("unchecked")
+ CommunicationsInterface<I, V, E, M> proxy =
+ (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+ CommunicationsInterface.class, VERSION_ID, addr, config);
+ return proxy;
+ }
+ jt.setService(new Text(addr.getAddress().getHostAddress() + ":" +
+ addr.getPort()));
+ UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ current.addToken(jt);
+ UserGroupInformation owner =
+ UserGroupInformation.createRemoteUser(jobId);
+ owner.addToken(jt);
+ @SuppressWarnings("unchecked")
+ CommunicationsInterface<I, V, E, M> proxy =
+ owner.doAs(new PrivilegedExceptionAction<
+ CommunicationsInterface<I, V, E, M>>() {
+ @Override
+ public CommunicationsInterface<I, V, E, M> run() throws Exception {
+ // All methods in CommunicationsInterface will be used for RPC
+ return (CommunicationsInterface<I, V, E, M>) RPC.getProxy(
+ CommunicationsInterface.class, VERSION_ID, addr, config);
}
- jt.setService(new Text(addr.getAddress().getHostAddress() + ":"
- + addr.getPort()));
- UserGroupInformation current = UserGroupInformation.getCurrentUser();
- current.addToken(jt);
- UserGroupInformation owner =
- UserGroupInformation.createRemoteUser(jobId);
- owner.addToken(jt);
- @SuppressWarnings("unchecked")
- CommunicationsInterface<I, V, E, M> proxy =
- owner.doAs(new PrivilegedExceptionAction<
- CommunicationsInterface<I, V, E, M>>() {
- @Override
- public CommunicationsInterface<I, V, E, M> run() throws Exception {
- // All methods in CommunicationsInterface will be used for RPC
- return (CommunicationsInterface<I, V, E, M> )RPC.getProxy(
- CommunicationsInterface.class, versionID, addr, config);
- }
- });
- return proxy;
-/*end[HADOOP]*/
- }
+ });
+ return proxy;
+ /*end[HADOOP]*/
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java Thu Feb 16 22:12:31 2012
@@ -26,45 +26,47 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
/**
- * Interface for message communication server
+ * Interface for message communication server.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface ServerInterface<I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends Closeable,
- WorkerCommunications<I, V, E, M> {
- /**
- * Setup the server.
- */
- void setup();
+ V extends Writable, E extends Writable, M extends Writable>
+ extends Closeable, WorkerCommunications<I, V, E, M> {
+ /**
+ * Setup the server.
+ */
+ void setup();
- /**
- * Move the in transition messages to the in messages for every vertex and
- * add new connections to any newly appearing RPC proxies.
- */
- void prepareSuperstep();
+ /**
+ * Move the in transition messages to the in messages for every vertex and
+ * add new connections to any newly appearing RPC proxies.
+ */
+ void prepareSuperstep();
- /**
- * Flush all outgoing messages. This will synchronously ensure that all
- * messages have been send and delivered prior to returning.
- *
- * @param context Context used to signal process
- * @return Number of messages sent during the last superstep
- * @throws IOException
- */
- long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
+ /**
+ * Flush all outgoing messages. This will synchronously ensure that all
+ * messages have been send and delivered prior to returning.
+ *
+ * @param context Context used to signal process
+ * @return Number of messages sent during the last superstep
+ * @throws IOException
+ */
+ long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException;
- /**
- * Closes all connections.
- *
- * @throws IOException
- */
- void closeConnections() throws IOException;
+ /**
+ * Closes all connections.
+ *
+ * @throws IOException
+ */
+ void closeConnections() throws IOException;
- /**
- * Shuts down.
- */
- void close();
+ /**
+ * Shuts down.
+ */
+ void close();
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java Thu Feb 16 22:12:31 2012
@@ -32,65 +32,75 @@ import org.apache.hadoop.io.WritableComp
* This object is only used for transporting list of vertices and their
* respective messages to a destination RPC server.
*
- * @param <I extends Writable> vertex id
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class VertexIdMessages<I extends WritableComparable, M extends Writable>
- implements Writable, Configurable {
- /** Vertex id */
- private I vertexId;
- /** Message list corresponding to vertex id */
- private MsgList<M> msgList;
- /** Configuration from Configurable */
- private Configuration conf;
-
- /**
- * Reflective constructor.
- */
- public VertexIdMessages() {}
-
- /**
- * Constructor used with creating initial values.
- *
- * @param vertexId Vertex id to be sent
- * @param msgList Mesage list for the vertex id to be sent
- */
- public VertexIdMessages(I vertexId, MsgList<M> msgList) {
- this.vertexId = vertexId;
- this.msgList = msgList;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- vertexId = BspUtils.<I>createVertexIndex(getConf());
- vertexId.readFields(input);
- msgList = new MsgList<M>();
- msgList.setConf(getConf());
- msgList.readFields(input);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- vertexId.write(output);
- msgList.write(output);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public I getVertexId() {
- return vertexId;
- }
-
- public MsgList<M> getMessageList() {
- return msgList;
- }
- }
+ implements Writable, Configurable {
+ /** Vertex id */
+ private I vertexId;
+ /** Message list corresponding to vertex id */
+ private MsgList<M> msgList;
+ /** Configuration from Configurable */
+ private Configuration conf;
+
+ /**
+ * Reflective constructor.
+ */
+ public VertexIdMessages() { }
+
+ /**
+ * Constructor used with creating initial values.
+ *
+ * @param vertexId Vertex id to be sent
+ * @param msgList Mesage list for the vertex id to be sent
+ */
+ public VertexIdMessages(I vertexId, MsgList<M> msgList) {
+ this.vertexId = vertexId;
+ this.msgList = msgList;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ vertexId = BspUtils.<I>createVertexIndex(getConf());
+ vertexId.readFields(input);
+ msgList = new MsgList<M>();
+ msgList.setConf(getConf());
+ msgList.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ vertexId.write(output);
+ msgList.write(output);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get the vertex id.
+ *
+ * @return Vertex id.
+ */
+ public I getVertexId() {
+ return vertexId;
+ }
+
+ /**
+ * Get the message list.
+ *
+ * @return Message list.
+ */
+ public MsgList<M> getMessageList() {
+ return msgList;
+ }
+}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java Thu Feb 16 22:12:31 2012
@@ -25,27 +25,35 @@ import org.apache.hadoop.io.WritableComp
* Wrapper around {@link ArrayListWritable} that provides the list for
* {@link VertexIdMessages}.
*
- * @param <I extends Writable> vertex id
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public class VertexIdMessagesList<I extends WritableComparable,
- M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> {
- /** Defining a layout version for a serializable class. */
- private static final long serialVersionUID = 100L;
+ M extends Writable> extends ArrayListWritable<VertexIdMessages<I, M>> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 100L;
- public VertexIdMessagesList() {
- super();
- }
+ /**
+ * Default constructor.
+ */
+ public VertexIdMessagesList() {
+ super();
+ }
- public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) {
- super(vertexIdMessagesList);
- }
+ /**
+ * Copy constructor.
+ *
+ * @param vertexIdMessagesList List to be copied.
+ */
+ public VertexIdMessagesList(VertexIdMessagesList<I, M> vertexIdMessagesList) {
+ super(vertexIdMessagesList);
+ }
- @SuppressWarnings("unchecked")
- @Override
- public void setClass() {
- setClass((Class<VertexIdMessages<I, M>>)
- (new VertexIdMessages<I, M>()).getClass());
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<VertexIdMessages<I, M>>)
+ (new VertexIdMessages<I, M>()).getClass());
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java Thu Feb 16 22:12:31 2012
@@ -33,24 +33,22 @@ import org.apache.hadoop.io.WritableComp
* @param <M> Message value
*/
@SuppressWarnings("rawtypes")
-public class VertexList<
- I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable>
- extends ArrayListWritable<BasicVertex<I, V, E, M>> {
- /** Defining a layout version for a serializable class. */
- private static final long serialVersionUID = 1000L;
+public class VertexList<I extends WritableComparable,
+ V extends Writable, E extends Writable,
+ M extends Writable>
+ extends ArrayListWritable<BasicVertex<I, V, E, M>> {
+ /** Defining a layout version for a serializable class. */
+ private static final long serialVersionUID = 1000L;
- /**
- * Default constructor for reflection
- */
- public VertexList() {}
+ /**
+ * Default constructor for reflection
+ */
+ public VertexList() { }
- @SuppressWarnings("unchecked")
- @Override
- public void setClass() {
- setClass((Class<BasicVertex<I, V, E, M>>)
- BspUtils.<I, V, E, M>getVertexClass(getConf()));
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setClass() {
+ setClass((Class<BasicVertex<I, V, E, M>>)
+ BspUtils.<I, V, E, M>getVertexClass(getConf()));
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java Thu Feb 16 22:12:31 2012
@@ -33,80 +33,78 @@ import java.util.Map;
/**
* Public interface for workers to do message communication
*
- * @param <I extends Writable> vertex id
- * @param <V extends Writable> vertex value
- * @param <E extends Writable> edge value
- * @param <M extends Writable> message data
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public interface WorkerCommunications<I extends WritableComparable,
- V extends Writable,
- E extends Writable,
- M extends Writable> {
- /**
- * Fix changes to the workers and the mapping between partitions and
- * workers.
- */
- void fixPartitionIdToSocketAddrMap();
-
- /**
- * Sends a message to destination vertex.
- *
- * @param id
- * @param msg
- */
- void sendMessageReq(I id, M msg);
-
- /**
- * Sends a partition to the appropriate partition owner
- *
- * @param workerInfo Owner the vertices belong to
- * @param partition Partition to send
- */
- void sendPartitionReq(WorkerInfo workerInfo,
- Partition<I, V, E, M> partition);
-
- /**
- * Sends a request to the appropriate vertex range owner to add an edge
- *
- * @param vertexIndex Index of the vertex to get the request
- * @param edge Edge to be added
- * @throws IOException
- */
- void addEdgeReq(I vertexIndex, Edge<I, E> edge) throws IOException;
-
- /**
- * Sends a request to the appropriate vertex range owner to remove an edge
- *
- * @param vertexIndex Index of the vertex to get the request
- * @param destinationVertexIndex Index of the edge to be removed
- * @throws IOException
- */
- void removeEdgeReq(I vertexIndex, I destinationVertexIndex)
- throws IOException;
-
- /**
- * Sends a request to the appropriate vertex range owner to add a vertex
- *
- * @param vertex Vertex to be added
- * @throws IOException
- */
- void addVertexReq(BasicVertex<I, V, E, M> vertex) throws IOException;
-
- /**
- * Sends a request to the appropriate vertex range owner to remove a vertex
- *
- * @param vertexIndex Index of the vertex to be removed
- * @throws IOException
- */
- void removeVertexReq(I vertexIndex) throws IOException;
-
- /**
- * Get the vertices that were sent in the last iteration. After getting
- * the map, the user should synchronize with it to insure it
- * is thread-safe.
- *
- * @return map of vertex ranges to vertices
- */
- Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
+ V extends Writable, E extends Writable, M extends Writable> {
+ /**
+ * Fix changes to the workers and the mapping between partitions and
+ * workers.
+ */
+ void fixPartitionIdToSocketAddrMap();
+
+ /**
+ * Sends a message to destination vertex.
+ *
+ * @param destVertexId Destination vertex id.
+ * @param message Message to send.
+ */
+ void sendMessageReq(I destVertexId, M message);
+
+ /**
+ * Sends a partition to the appropriate partition owner
+ *
+ * @param workerInfo Owner the vertices belong to
+ * @param partition Partition to send
+ */
+ void sendPartitionReq(WorkerInfo workerInfo,
+ Partition<I, V, E, M> partition);
+
+ /**
+ * Sends a request to the appropriate vertex range owner to add an edge
+ *
+ * @param vertexIndex Index of the vertex to get the request
+ * @param edge Edge to be added
+ * @throws IOException
+ */
+ void addEdgeReq(I vertexIndex, Edge<I, E> edge) throws IOException;
+
+ /**
+ * Sends a request to the appropriate vertex range owner to remove an edge
+ *
+ * @param vertexIndex Index of the vertex to get the request
+ * @param destinationVertexIndex Index of the edge to be removed
+ * @throws IOException
+ */
+ void removeEdgeReq(I vertexIndex, I destinationVertexIndex)
+ throws IOException;
+
+ /**
+ * Sends a request to the appropriate vertex range owner to add a vertex
+ *
+ * @param vertex Vertex to be added
+ * @throws IOException
+ */
+ void addVertexReq(BasicVertex<I, V, E, M> vertex) throws IOException;
+
+ /**
+ * Sends a request to the appropriate vertex range owner to remove a vertex
+ *
+ * @param vertexIndex Index of the vertex to be removed
+ * @throws IOException
+ */
+ void removeVertexReq(I vertexIndex) throws IOException;
+
+ /**
+ * Get the vertices that were sent in the last iteration. After getting
+ * the map, the user should synchronize with it to insure it
+ * is thread-safe.
+ *
+ * @return map of vertex ranges to vertices
+ */
+ Map<Integer, List<BasicVertex<I, V, E, M>>> getInPartitionVertexMap();
}
Copied: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java (from r1243701, incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java)
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java?p2=incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java&p1=incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java&r1=1243701&r2=1245205&rev=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/bsp/ApplicationState.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java Thu Feb 16 22:12:31 2012
@@ -15,15 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.giraph.bsp;
-
/**
- * State of the BSP application
+ * Package of communication related objects, RPC service.
*/
-public enum ApplicationState {
- UNKNOWN, ///< Shouldn't be seen, just an initial state
- START_SUPERSTEP, ///< Start from a desired superstep
- FAILED, ///< Unrecoverable
- FINISHED ///< Successful completion
-}
+package org.apache.giraph.comm;
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.giraph.examples;
@@ -25,72 +25,72 @@ import java.io.IOException;
import java.util.Iterator;
/**
- * Implementation of the HCC algorithm that identifies connected components and assigns each
- * vertex its "component identifier" (the smallest vertex id in the component)
+ * Implementation of the HCC algorithm that identifies connected components and
+ * assigns each vertex its "component identifier" (the smallest vertex id
+ * in the component)
*
- * The idea behind the algorithm is very simple: propagate the smallest vertex id along the
- * edges to all vertices of a connected component. The number of supersteps necessary is
- * equal to the length of the maximum diameter of all components + 1
+ * The idea behind the algorithm is very simple: propagate the smallest
+ * vertex id along the edges to all vertices of a connected component. The
+ * number of supersteps necessary is equal to the length of the maximum
+ * diameter of all components + 1
*
- * The original Hadoop-based variant of this algorithm was proposed by Kang, Charalampos
- * Tsourakakis and Faloutsos in "PEGASUS: Mining Peta-Scale Graphs", 2010
+ * The original Hadoop-based variant of this algorithm was proposed by Kang,
+ * Charalampos, Tsourakakis and Faloutsos in
+ * "PEGASUS: Mining Peta-Scale Graphs", 2010
*
* http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
*/
public class ConnectedComponentsVertex extends IntIntNullIntVertex {
-
- /**
- * Propagates the smallest vertex id to all neighbors. Will always choose to halt and only
- * reactivate if a smaller id has been sent to it.
- *
- * @param messages
- * @throws IOException
- */
- @Override
- public void compute(Iterator<IntWritable> messages) throws IOException {
-
- int currentComponent = getVertexValue().get();
-
- // first superstep is special, because we can simply look at the neighbors
- if (getSuperstep() == 0) {
- for (Iterator<IntWritable> edges = iterator(); edges.hasNext();) {
- int neighbor = edges.next().get();
- if (neighbor < currentComponent) {
- currentComponent = neighbor;
- }
- }
- // only need to send value if it is not the own id
- if (currentComponent != getVertexValue().get()) {
- setVertexValue(new IntWritable(currentComponent));
- for (Iterator<IntWritable> edges = iterator();
- edges.hasNext();) {
- int neighbor = edges.next().get();
- if (neighbor > currentComponent) {
- sendMsg(new IntWritable(neighbor), getVertexValue());
- }
- }
- }
-
- voteToHalt();
- return;
+ /**
+ * Propagates the smallest vertex id to all neighbors. Will always choose to
+ * halt and only reactivate if a smaller id has been sent to it.
+ *
+ * @param messages Iterator of messages from the previous superstep.
+ * @throws IOException
+ */
+ @Override
+ public void compute(Iterator<IntWritable> messages) throws IOException {
+ int currentComponent = getVertexValue().get();
+
+ // First superstep is special, because we can simply look at the neighbors
+ if (getSuperstep() == 0) {
+ for (Iterator<IntWritable> edges = iterator(); edges.hasNext();) {
+ int neighbor = edges.next().get();
+ if (neighbor < currentComponent) {
+ currentComponent = neighbor;
}
-
- boolean changed = false;
- // did we get a smaller id ?
- while (messages.hasNext()) {
- int candidateComponent = messages.next().get();
- if (candidateComponent < currentComponent) {
- currentComponent = candidateComponent;
- changed = true;
- }
+ }
+ // Only need to send value if it is not the own id
+ if (currentComponent != getVertexValue().get()) {
+ setVertexValue(new IntWritable(currentComponent));
+ for (Iterator<IntWritable> edges = iterator();
+ edges.hasNext();) {
+ int neighbor = edges.next().get();
+ if (neighbor > currentComponent) {
+ sendMsg(new IntWritable(neighbor), getVertexValue());
+ }
}
+ }
- // propagate new component id to the neighbors
- if (changed) {
- setVertexValue(new IntWritable(currentComponent));
- sendMsgToAllEdges(getVertexValue());
- }
- voteToHalt();
+ voteToHalt();
+ return;
}
+ boolean changed = false;
+ // did we get a smaller id ?
+ while (messages.hasNext()) {
+ int candidateComponent = messages.next().get();
+ if (candidateComponent < currentComponent) {
+ currentComponent = candidateComponent;
+ changed = true;
+ }
+ }
+
+ // propagate new component id to the neighbors
+ if (changed) {
+ setVertexValue(new IntWritable(currentComponent));
+ sendMsgToAllEdges(getVertexValue());
+ }
+ voteToHalt();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexInputFormat.java Thu Feb 16 22:12:31 2012
@@ -32,22 +32,25 @@ import java.util.List;
/**
* This VertexInputFormat is meant for testing/debugging. It simply generates
* some vertex data that can be consumed by test applications.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class GeneratedVertexInputFormat<
- I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable>
- extends VertexInputFormat<I, V, E, M> {
-
- @Override
- public List<InputSplit> getSplits(JobContext context, int numWorkers)
- throws IOException, InterruptedException {
- // This is meaningless, the VertexReader will generate all the test
- // data.
- List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
- for (int i = 0; i < numWorkers; ++i) {
- inputSplitList.add(new BspInputSplit(i, numWorkers));
- }
- return inputSplitList;
+ I extends WritableComparable, V extends Writable, E extends Writable,
+ M extends Writable> extends VertexInputFormat<I, V, E, M> {
+ @Override
+ public List<InputSplit> getSplits(JobContext context, int numWorkers)
+ throws IOException, InterruptedException {
+ // This is meaningless, the VertexReader will generate all the test
+ // data.
+ List<InputSplit> inputSplitList = new ArrayList<InputSplit>();
+ for (int i = 0; i < numWorkers; ++i) {
+ inputSplitList.add(new BspInputSplit(i, numWorkers));
}
+ return inputSplitList;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java Thu Feb 16 22:12:31 2012
@@ -34,53 +34,58 @@ import java.io.IOException;
* @param <I> Vertex index value
* @param <V> Vertex value
* @param <E> Edge value
+ * @param <M> Message data
*/
@SuppressWarnings("rawtypes")
public abstract class GeneratedVertexReader<
- I extends WritableComparable, V extends Writable, E extends Writable,
- M extends Writable>
- implements VertexReader<I, V, E, M> {
- /** Records read so far */
- protected long recordsRead = 0;
- /** Total records to read (on this split alone) */
- protected long totalRecords = 0;
- /** The input split from initialize(). */
- protected BspInputSplit inputSplit = null;
- /** Reverse the id order? */
- protected boolean reverseIdOrder;
-
- protected Configuration configuration = null;
-
- public static final String READER_VERTICES =
- "GeneratedVertexReader.reader_vertices";
- public static final long DEFAULT_READER_VERTICES = 10;
- public static final String REVERSE_ID_ORDER =
- "GeneratedVertexReader.reverseIdOrder";
- public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
-
- public GeneratedVertexReader() {
- }
-
- @Override
- final public void initialize(InputSplit inputSplit,
- TaskAttemptContext context)
- throws IOException {
- configuration = context.getConfiguration();
- totalRecords = configuration.getLong(
- GeneratedVertexReader.READER_VERTICES,
- GeneratedVertexReader.DEFAULT_READER_VERTICES);
- reverseIdOrder = configuration.getBoolean(
- GeneratedVertexReader.REVERSE_ID_ORDER,
- GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
- this.inputSplit = (BspInputSplit) inputSplit;
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- final public float getProgress() throws IOException {
- return recordsRead * 100.0f / totalRecords;
- }
+ I extends WritableComparable, V extends Writable, E extends Writable,
+ M extends Writable> implements VertexReader<I, V, E, M> {
+ /** Vertices produced by this reader */
+ public static final String READER_VERTICES =
+ "GeneratedVertexReader.reader_vertices";
+ /** Default vertices produced by this reader */
+ public static final long DEFAULT_READER_VERTICES = 10;
+ /** Reverse the order of the vertices? */
+ public static final String REVERSE_ID_ORDER =
+ "GeneratedVertexReader.reverseIdOrder";
+ /** Default ordering is not reversed */
+ public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+ /** Records read so far */
+ protected long recordsRead = 0;
+ /** Total records to read (on this split alone) */
+ protected long totalRecords = 0;
+ /** The input split from initialize(). */
+ protected BspInputSplit inputSplit = null;
+ /** Reverse the id order? */
+ protected boolean reverseIdOrder;
+ /** Saved configuration */
+ protected Configuration configuration = null;
+
+ /**
+ * Default constructor for reflection.
+ */
+ public GeneratedVertexReader() {
+ }
+
+ @Override
+ public final void initialize(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException {
+ configuration = context.getConfiguration();
+ totalRecords = configuration.getLong(
+ GeneratedVertexReader.READER_VERTICES,
+ GeneratedVertexReader.DEFAULT_READER_VERTICES);
+ reverseIdOrder = configuration.getBoolean(
+ GeneratedVertexReader.REVERSE_ID_ORDER,
+ GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+ this.inputSplit = (BspInputSplit) inputSplit;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public final float getProgress() throws IOException {
+ return recordsRead * 100.0f / totalRecords;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/IntIntNullIntTextInputFormat.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.giraph.examples;
@@ -37,61 +37,68 @@ import java.util.Map;
import java.util.regex.Pattern;
/**
- * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for unweighted
- * graphs with int ids.
+ * Simple text-based {@link org.apache.giraph.graph.VertexInputFormat} for
+ * unweighted graphs with int ids.
*
* Each line consists of: vertex neighbor1 neighbor2 ...
*/
public class IntIntNullIntTextInputFormat extends
- TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
- IntWritable> {
+ TextVertexInputFormat<IntWritable, IntWritable, NullWritable,
+ IntWritable> {
- @Override
- public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
- createVertexReader(InputSplit split, TaskAttemptContext context)
- throws IOException {
- return new IntIntNullIntVertexReader(
- textInputFormat.createRecordReader(split, context));
+ @Override
+ public VertexReader<IntWritable, IntWritable, NullWritable, IntWritable>
+ createVertexReader(InputSplit split, TaskAttemptContext context)
+ throws IOException {
+ return new IntIntNullIntVertexReader(
+ textInputFormat.createRecordReader(split, context));
+ }
+
+ /**
+ * Vertex reader associated with {@link IntIntNullIntTextInputFormat}.
+ */
+ public static class IntIntNullIntVertexReader extends
+ TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
+ NullWritable, IntWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ /**
+ * Constructor with the line reader.
+ *
+ * @param lineReader Internal line reader.
+ */
+ public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
+ lineReader) {
+ super(lineReader);
}
- public static class IntIntNullIntVertexReader extends
- TextVertexInputFormat.TextVertexReader<IntWritable, IntWritable,
- NullWritable, IntWritable> {
-
- private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
-
- public IntIntNullIntVertexReader(RecordReader<LongWritable, Text>
- lineReader) {
- super(lineReader);
- }
-
- @Override
- public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
- getCurrentVertex() throws IOException, InterruptedException {
- BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
- vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
- IntWritable>createVertex(getContext().getConfiguration());
-
- String[] tokens = SEPARATOR.split(getRecordReader()
- .getCurrentValue().toString());
- Map<IntWritable, NullWritable> edges =
- Maps.newHashMapWithExpectedSize(tokens.length - 1);
- for (int n = 1; n < tokens.length; n++) {
- edges.put(new IntWritable(Integer.parseInt(tokens[n])),
- NullWritable.get());
- }
-
- IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
- vertex.initialize(vertexId, vertexId, edges,
- Lists.<IntWritable>newArrayList());
-
- return vertex;
- }
-
- @Override
- public boolean nextVertex() throws IOException, InterruptedException {
- return getRecordReader().nextKeyValue();
- }
+ @Override
+ public BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+ getCurrentVertex() throws IOException, InterruptedException {
+ BasicVertex<IntWritable, IntWritable, NullWritable, IntWritable>
+ vertex = BspUtils.<IntWritable, IntWritable, NullWritable,
+ IntWritable>createVertex(getContext().getConfiguration());
+
+ String[] tokens = SEPARATOR.split(getRecordReader()
+ .getCurrentValue().toString());
+ Map<IntWritable, NullWritable> edges =
+ Maps.newHashMapWithExpectedSize(tokens.length - 1);
+ for (int n = 1; n < tokens.length; n++) {
+ edges.put(new IntWritable(Integer.parseInt(tokens[n])),
+ NullWritable.get());
+ }
+
+ IntWritable vertexId = new IntWritable(Integer.parseInt(tokens[0]));
+ vertex.initialize(vertexId, vertexId, edges,
+ Lists.<IntWritable>newArrayList());
+
+ return vertex;
}
+ @Override
+ public boolean nextVertex() throws IOException, InterruptedException {
+ return getRecordReader().nextKeyValue();
+ }
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java Thu Feb 16 22:12:31 2012
@@ -26,30 +26,35 @@ import org.apache.giraph.graph.Aggregato
* Aggregator for summing up values.
*/
public class LongSumAggregator implements Aggregator<LongWritable> {
- /** Internal sum */
- private long sum = 0;
+ /** Internal sum */
+ private long sum = 0;
- public void aggregate(long value) {
- sum += value;
- }
+ /**
+ * Aggregate with a primitive long.
+ *
+ * @param value Long value to aggregate.
+ */
+ public void aggregate(long value) {
+ sum += value;
+ }
- @Override
- public void aggregate(LongWritable value) {
- sum += value.get();
- }
+ @Override
+ public void aggregate(LongWritable value) {
+ sum += value.get();
+ }
- @Override
- public void setAggregatedValue(LongWritable value) {
- sum = value.get();
- }
+ @Override
+ public void setAggregatedValue(LongWritable value) {
+ sum = value.get();
+ }
- @Override
- public LongWritable getAggregatedValue() {
- return new LongWritable(sum);
- }
+ @Override
+ public LongWritable getAggregatedValue() {
+ return new LongWritable(sum);
+ }
- @Override
- public LongWritable createAggregatedValue() {
- return new LongWritable();
- }
+ @Override
+ public LongWritable createAggregatedValue() {
+ return new LongWritable();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MaxAggregator.java Thu Feb 16 22:12:31 2012
@@ -25,29 +25,31 @@ import org.apache.giraph.graph.Aggregato
/**
* Aggregator for getting max value.
*
- **/
-
+ */
public class MaxAggregator implements Aggregator<DoubleWritable> {
-
+ /** Saved maximum value */
private double max = Double.MIN_VALUE;
+ @Override
public void aggregate(DoubleWritable value) {
- double val = value.get();
- if (val > max) {
- max = val;
- }
+ double val = value.get();
+ if (val > max) {
+ max = val;
+ }
}
+ @Override
public void setAggregatedValue(DoubleWritable value) {
- max = value.get();
+ max = value.get();
}
+ @Override
public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(max);
+ return new DoubleWritable(max);
}
+ @Override
public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
+ return new DoubleWritable();
}
-
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinAggregator.java Thu Feb 16 22:12:31 2012
@@ -24,30 +24,32 @@ import org.apache.giraph.graph.Aggregato
/**
* Aggregator for getting min value.
- *
- **/
-
+ */
public class MinAggregator implements Aggregator<DoubleWritable> {
-
+ /** Internal aggregator */
private double min = Double.MAX_VALUE;
+ @Override
public void aggregate(DoubleWritable value) {
- double val = value.get();
- if (val < min) {
- min = val;
- }
+ double val = value.get();
+ if (val < min) {
+ min = val;
+ }
}
+ @Override
public void setAggregatedValue(DoubleWritable value) {
- min = value.get();
+ min = value.get();
}
+ @Override
public DoubleWritable getAggregatedValue() {
- return new DoubleWritable(min);
+ return new DoubleWritable(min);
}
+ @Override
public DoubleWritable createAggregatedValue() {
- return new DoubleWritable();
+ return new DoubleWritable();
}
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/MinimumIntCombiner.java Thu Feb 16 22:12:31 2012
@@ -1,20 +1,20 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.giraph.examples;
@@ -29,20 +29,19 @@ import java.util.List;
* {@link VertexCombiner} that finds the minimum {@link IntWritable}
*/
public class MinimumIntCombiner
- extends VertexCombiner<IntWritable, IntWritable> {
-
- @Override
- public Iterable<IntWritable> combine(IntWritable target,
- Iterable<IntWritable> messages) throws IOException {
- int minimum = Integer.MAX_VALUE;
- for (IntWritable message : messages) {
- if (message.get() < minimum) {
- minimum = message.get();
- }
- }
- List<IntWritable> value = new ArrayList<IntWritable>();
- value.add(new IntWritable(minimum));
-
- return value;
+ extends VertexCombiner<IntWritable, IntWritable> {
+ @Override
+ public Iterable<IntWritable> combine(IntWritable target,
+ Iterable<IntWritable> messages) throws IOException {
+ int minimum = Integer.MAX_VALUE;
+ for (IntWritable message : messages) {
+ if (message.get() < minimum) {
+ minimum = message.get();
+ }
}
+ List<IntWritable> value = new ArrayList<IntWritable>();
+ value.add(new IntWritable(minimum));
+
+ return value;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleAggregatorWriter.java Thu Feb 16 22:12:31 2012
@@ -37,32 +37,37 @@ import org.apache.hadoop.mapreduce.Mappe
* directory.
*/
public class SimpleAggregatorWriter implements AggregatorWriter {
- /** the name of the file we wrote to */
- public static String filename;
- private FSDataOutputStream output;
-
- @SuppressWarnings("rawtypes")
- @Override
- public void initialize(Context context, long applicationAttempt)
- throws IOException {
- filename = "aggregatedValues_"+applicationAttempt;
- Path p = new Path(filename);
- FileSystem fs = FileSystem.get(context.getConfiguration());
- output = fs.create(p, true);
- }
+ /** Name of the file we wrote to */
+ private static String FILENAME;
+ /** Saved output stream to write to */
+ private FSDataOutputStream output;
- @Override
- public void writeAggregator(Map<String, Aggregator<Writable>> map,
- long superstep) throws IOException {
+ public static String getFilename() {
+ return FILENAME;
+ }
- for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
- aggregator.getValue().getAggregatedValue().write(output);
- }
- output.flush();
- }
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void initialize(Context context, long applicationAttempt)
+ throws IOException {
+ FILENAME = "aggregatedValues_" + applicationAttempt;
+ Path p = new Path(FILENAME);
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ output = fs.create(p, true);
+ }
- @Override
- public void close() throws IOException {
- output.close();
+ @Override
+ public void writeAggregator(Map<String, Aggregator<Writable>> map,
+ long superstep) throws IOException {
+
+ for (Entry<String, Aggregator<Writable>> aggregator: map.entrySet()) {
+ aggregator.getValue().getAggregatedValue().write(output);
}
+ output.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ output.close();
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCheckpointVertex.java Thu Feb 16 22:12:31 2012
@@ -18,8 +18,14 @@
package org.apache.giraph.examples;
-import org.apache.commons.cli.*;
-import org.apache.giraph.graph.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.graph.EdgeListVertex;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
@@ -38,208 +44,222 @@ import java.util.Iterator;
* can also test automated checkpoint restarts.
*/
public class SimpleCheckpointVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
- implements Tool {
- private static Logger LOG =
- Logger.getLogger(SimpleCheckpointVertex.class);
- /** Configuration */
- private Configuration conf;
- /** Which superstep to cause the worker to fail */
- public final int faultingSuperstep = 4;
- /** Vertex id to fault on */
- public final long faultingVertexId = 1;
- /** Dynamically set number of supersteps */
- public static final String SUPERSTEP_COUNT =
- "simpleCheckpointVertex.superstepCount";
- /** Should fault? */
- public static final String ENABLE_FAULT=
- "simpleCheckpointVertex.enableFault";
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable, FloatWritable>
+ implements Tool {
+ /** Which superstep to cause the worker to fail */
+ public static final int FAULTING_SUPERSTEP = 4;
+ /** Vertex id to fault on */
+ public static final long FAULTING_VERTEX_ID = 1;
+ /** Dynamically set number of supersteps */
+ public static final String SUPERSTEP_COUNT =
+ "simpleCheckpointVertex.superstepCount";
+ /** Should fault? */
+ public static final String ENABLE_FAULT =
+ "simpleCheckpointVertex.enableFault";
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SimpleCheckpointVertex.class);
+ /** Configuration */
+ private Configuration conf;
+
+ @Override
+ public void compute(Iterator<FloatWritable> msgIterator) {
+ SimpleCheckpointVertexWorkerContext workerContext =
+ (SimpleCheckpointVertexWorkerContext) getWorkerContext();
+
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+
+ boolean enableFault = workerContext.getEnableFault();
+ int supersteps = workerContext.getSupersteps();
+
+ if (enableFault && (getSuperstep() == FAULTING_SUPERSTEP) &&
+ (getContext().getTaskAttemptID().getId() == 0) &&
+ (getVertexId().get() == FAULTING_VERTEX_ID)) {
+ LOG.info("compute: Forced a fault on the first " +
+ "attempt of superstep " +
+ FAULTING_SUPERSTEP + " and vertex id " +
+ FAULTING_VERTEX_ID);
+ System.exit(-1);
+ }
+ if (getSuperstep() > supersteps) {
+ voteToHalt();
+ return;
+ }
+ LOG.info("compute: " + sumAggregator);
+ sumAggregator.aggregate(getVertexId().get());
+ LOG.info("compute: sum = " +
+ sumAggregator.getAggregatedValue().get() +
+ " for vertex " + getVertexId());
+ float msgValue = 0.0f;
+ while (msgIterator.hasNext()) {
+ float curMsgValue = msgIterator.next().get();
+ msgValue += curMsgValue;
+ LOG.info("compute: got msgValue = " + curMsgValue +
+ " for vertex " + getVertexId() +
+ " on superstep " + getSuperstep());
+ }
+ int vertexValue = getVertexValue().get();
+ setVertexValue(new IntWritable(vertexValue + (int) msgValue));
+ LOG.info("compute: vertex " + getVertexId() +
+ " has value " + getVertexValue() +
+ " on superstep " + getSuperstep());
+ for (LongWritable targetVertexId : this) {
+ FloatWritable edgeValue = getEdgeValue(targetVertexId);
+ LOG.info("compute: vertex " + getVertexId() +
+ " sending edgeValue " + edgeValue +
+ " vertexValue " + vertexValue +
+ " total " + (edgeValue.get() +
+ (float) vertexValue) +
+ " to vertex " + targetVertexId +
+ " on superstep " + getSuperstep());
+ edgeValue.set(edgeValue.get() + (float) vertexValue);
+ addEdge(targetVertexId, edgeValue);
+ sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
+ }
+ }
+
+ /**
+ * Worker context associated with {@link SimpleCheckpointVertex}.
+ */
+ public static class SimpleCheckpointVertexWorkerContext
+ extends WorkerContext {
+ /** Filename to indicate whether a fault was found */
+ public static final String FAULT_FILE = "/tmp/faultFile";
+ /** User can access this after the application finishes if local */
+ private static long FINAL_SUM;
+ /** Number of supersteps to run (6 by default) */
+ private int supersteps = 6;
+ /** Enable the fault at the particular vertex id and superstep? */
+ private boolean enableFault = false;
- @Override
- public void compute(Iterator<FloatWritable> msgIterator) {
- SimpleCheckpointVertexWorkerContext workerContext =
- (SimpleCheckpointVertexWorkerContext) getWorkerContext();
-
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
-
- boolean enableFault = workerContext.getEnableFault();
- int supersteps = workerContext.getSupersteps();
-
- if (enableFault && (getSuperstep() == faultingSuperstep) &&
- (getContext().getTaskAttemptID().getId() == 0) &&
- (getVertexId().get() == faultingVertexId)) {
- System.out.println("compute: Forced a fault on the first " +
- "attempt of superstep " +
- faultingSuperstep + " and vertex id " +
- faultingVertexId);
- System.exit(-1);
- }
- if (getSuperstep() > supersteps) {
- voteToHalt();
- return;
- }
- System.out.println("compute: " + sumAggregator);
- sumAggregator.aggregate(getVertexId().get());
- System.out.println("compute: sum = " +
- sumAggregator.getAggregatedValue().get() +
- " for vertex " + getVertexId());
- float msgValue = 0.0f;
- while (msgIterator.hasNext()) {
- float curMsgValue = msgIterator.next().get();
- msgValue += curMsgValue;
- System.out.println("compute: got msgValue = " + curMsgValue +
- " for vertex " + getVertexId() +
- " on superstep " + getSuperstep());
- }
- int vertexValue = getVertexValue().get();
- setVertexValue(new IntWritable(vertexValue + (int) msgValue));
- System.out.println("compute: vertex " + getVertexId() +
- " has value " + getVertexValue() +
- " on superstep " + getSuperstep());
- for (LongWritable targetVertexId : this) {
- FloatWritable edgeValue = getEdgeValue(targetVertexId);
- System.out.println("compute: vertex " + getVertexId() +
- " sending edgeValue " + edgeValue +
- " vertexValue " + vertexValue +
- " total " + (edgeValue.get() +
- (float) vertexValue) +
- " to vertex " + targetVertexId +
- " on superstep " + getSuperstep());
- edgeValue.set(edgeValue.get() + (float) vertexValue);
- addEdge(targetVertexId, edgeValue);
- sendMsg(targetVertexId, new FloatWritable(edgeValue.get()));
- }
- }
-
- public static class SimpleCheckpointVertexWorkerContext
- extends WorkerContext {
- /** User can access this after the application finishes if local */
- public static long finalSum;
- /** Number of supersteps to run (6 by default) */
- private int supersteps = 6;
- /** Filename to indicate whether a fault was found */
- public final String faultFile = "/tmp/faultFile";
- /** Enable the fault at the particular vertex id and superstep? */
- private boolean enableFault = false;
-
- @Override
- public void preApplication()
- throws InstantiationException, IllegalAccessException {
- registerAggregator(LongSumAggregator.class.getName(),
- LongSumAggregator.class);
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- sumAggregator.setAggregatedValue(new LongWritable(0));
- supersteps = getContext().getConfiguration()
- .getInt(SUPERSTEP_COUNT, supersteps);
- enableFault = getContext().getConfiguration()
- .getBoolean(ENABLE_FAULT, false);
- }
-
- @Override
- public void postApplication() {
- LongSumAggregator sumAggregator = (LongSumAggregator)
- getAggregator(LongSumAggregator.class.getName());
- finalSum = sumAggregator.getAggregatedValue().get();
- LOG.info("finalSum="+ finalSum);
- }
-
- @Override
- public void preSuperstep() {
- useAggregator(LongSumAggregator.class.getName());
- }
-
- @Override
- public void postSuperstep() { }
-
- public int getSupersteps() {
- return this.supersteps;
- }
-
- public boolean getEnableFault() {
- return this.enableFault;
- }
+ public static long getFinalSum() {
+ return FINAL_SUM;
}
@Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- options.addOption("h", "help", false, "Help");
- options.addOption("v", "verbose", false, "Verbose");
- options.addOption("w",
- "workers",
- true,
- "Number of workers");
- options.addOption("s",
- "supersteps",
- true,
- "Supersteps to execute before finishing");
- options.addOption("w",
- "workers",
- true,
- "Minimum number of workers");
- options.addOption("o",
- "outputDirectory",
- true,
- "Output directory");
- HelpFormatter formatter = new HelpFormatter();
- if (args.length == 0) {
- formatter.printHelp(getClass().getName(), options, true);
- return 0;
- }
- CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse(options, args);
- if (cmd.hasOption('h')) {
- formatter.printHelp(getClass().getName(), options, true);
- return 0;
- }
- if (!cmd.hasOption('w')) {
- System.out.println("Need to choose the number of workers (-w)");
- return -1;
- }
- if (!cmd.hasOption('o')) {
- System.out.println("Need to set the output directory (-o)");
- return -1;
- }
-
- GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
- bspJob.setVertexClass(getClass());
- bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
- bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
- bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
- int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
- int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
- bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
-
- FileOutputFormat.setOutputPath(bspJob,
- new Path(cmd.getOptionValue('o')));
- boolean verbose = false;
- if (cmd.hasOption('v')) {
- verbose = true;
- }
- if (cmd.hasOption('s')) {
- getConf().setInt(SUPERSTEP_COUNT,
- Integer.parseInt(cmd.getOptionValue('s')));
- }
- if (bspJob.run(verbose) == true) {
- return 0;
- } else {
- return -1;
- }
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ registerAggregator(LongSumAggregator.class.getName(),
+ LongSumAggregator.class);
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ sumAggregator.setAggregatedValue(new LongWritable(0));
+ supersteps = getContext().getConfiguration()
+ .getInt(SUPERSTEP_COUNT, supersteps);
+ enableFault = getContext().getConfiguration()
+ .getBoolean(ENABLE_FAULT, false);
}
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
+ @Override
+ public void postApplication() {
+ LongSumAggregator sumAggregator = (LongSumAggregator)
+ getAggregator(LongSumAggregator.class.getName());
+ FINAL_SUM = sumAggregator.getAggregatedValue().get();
+ LOG.info("FINAL_SUM=" + FINAL_SUM);
}
@Override
- public Configuration getConf() {
- return conf;
+ public void preSuperstep() {
+ useAggregator(LongSumAggregator.class.getName());
}
@Override
- public void setConf(Configuration conf) {
- this.conf = conf;
+ public void postSuperstep() { }
+
+ public int getSupersteps() {
+ return this.supersteps;
+ }
+
+ public boolean getEnableFault() {
+ return this.enableFault;
}
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ options.addOption("h", "help", false, "Help");
+ options.addOption("v", "verbose", false, "Verbose");
+ options.addOption("w",
+ "workers",
+ true,
+ "Number of workers");
+ options.addOption("s",
+ "supersteps",
+ true,
+ "Supersteps to execute before finishing");
+ options.addOption("w",
+ "workers",
+ true,
+ "Minimum number of workers");
+ options.addOption("o",
+ "outputDirectory",
+ true,
+ "Output directory");
+ HelpFormatter formatter = new HelpFormatter();
+ if (args.length == 0) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.hasOption('h')) {
+ formatter.printHelp(getClass().getName(), options, true);
+ return 0;
+ }
+ if (!cmd.hasOption('w')) {
+ LOG.info("Need to choose the number of workers (-w)");
+ return -1;
+ }
+ if (!cmd.hasOption('o')) {
+ LOG.info("Need to set the output directory (-o)");
+ return -1;
+ }
+
+ GiraphJob bspJob = new GiraphJob(getConf(), getClass().getName());
+ bspJob.setVertexClass(getClass());
+ bspJob.setVertexInputFormatClass(GeneratedVertexInputFormat.class);
+ bspJob.setVertexOutputFormatClass(SimpleTextVertexOutputFormat.class);
+ bspJob.setWorkerContextClass(SimpleCheckpointVertexWorkerContext.class);
+ int minWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+ int maxWorkers = Integer.parseInt(cmd.getOptionValue('w'));
+ bspJob.setWorkerConfiguration(minWorkers, maxWorkers, 100.0f);
+
+ FileOutputFormat.setOutputPath(bspJob,
+ new Path(cmd.getOptionValue('o')));
+ boolean verbose = false;
+ if (cmd.hasOption('v')) {
+ verbose = true;
+ }
+ if (cmd.hasOption('s')) {
+ getConf().setInt(SUPERSTEP_COUNT,
+ Integer.parseInt(cmd.getOptionValue('s')));
+ }
+ if (bspJob.run(verbose)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Executable from the command line.
+ *
+ * @param args Command line args.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ System.exit(ToolRunner.run(new SimpleCheckpointVertex(), args));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
}
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleCombinerVertex.java Thu Feb 16 22:12:31 2012
@@ -23,6 +23,7 @@ import java.util.Iterator;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.log4j.Logger;
import org.apache.giraph.graph.EdgeListVertex;
@@ -30,36 +31,38 @@ import org.apache.giraph.graph.EdgeListV
* Test whether messages can go through a combiner.
*/
public class SimpleCombinerVertex extends
- EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
- @Override
- public void compute(Iterator<IntWritable> msgIterator) {
- if (getVertexId().equals(new LongWritable(2))) {
- sendMsg(new LongWritable(1), new IntWritable(101));
- sendMsg(new LongWritable(1), new IntWritable(102));
- sendMsg(new LongWritable(1), new IntWritable(103));
- }
- if (!getVertexId().equals(new LongWritable(1))) {
- voteToHalt();
- }
- else {
- // Check the messages
- int sum = 0;
- int num = 0;
- while (msgIterator != null && msgIterator.hasNext()) {
- sum += msgIterator.next().get();
- num++;
- }
- System.out.println("TestCombinerVertex: Received a sum of " + sum +
- " (should have 306 with a single message value)");
+ EdgeListVertex<LongWritable, IntWritable, FloatWritable, IntWritable> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(SimpleCombinerVertex.class);
- if (num == 1 && sum == 306) {
- voteToHalt();
- }
- }
- if (getSuperstep() > 3) {
- throw new IllegalStateException(
- "TestCombinerVertex: Vertex 1 failed to receive " +
- "messages in time");
- }
+ @Override
+ public void compute(Iterator<IntWritable> msgIterator) {
+ if (getVertexId().equals(new LongWritable(2))) {
+ sendMsg(new LongWritable(1), new IntWritable(101));
+ sendMsg(new LongWritable(1), new IntWritable(102));
+ sendMsg(new LongWritable(1), new IntWritable(103));
}
+ if (!getVertexId().equals(new LongWritable(1))) {
+ voteToHalt();
+ } else {
+ // Check the messages
+ int sum = 0;
+ int num = 0;
+ while (msgIterator != null && msgIterator.hasNext()) {
+ sum += msgIterator.next().get();
+ num++;
+ }
+ LOG.info("TestCombinerVertex: Received a sum of " + sum +
+ " (should have 306 with a single message value)");
+
+ if (num == 1 && sum == 306) {
+ voteToHalt();
+ }
+ }
+ if (getSuperstep() > 3) {
+ throw new IllegalStateException(
+ "TestCombinerVertex: Vertex 1 failed to receive " +
+ "messages in time");
+ }
+ }
}