You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/10/11 18:48:47 UTC

svn commit: r1397159 [1/2] - in /giraph/trunk: ./ giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/ giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/ giraph-formats-contrib/src/test/java/org/apache/giraph/...

Author: apresta
Date: Thu Oct 11 16:48:45 2012
New Revision: 1397159

URL: http://svn.apache.org/viewvc?rev=1397159&view=rev
Log:
GIRAPH-200: remove hadoop RPC and keep just netty (apresta)

Added:
    giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
Removed:
    giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java
    giraph/trunk/src/main/java/org/apache/giraph/hadoop/
    giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
    giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
    giraph/trunk/pom.xml
    giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
    giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
    giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
    giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
    giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
    giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
    giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
    giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Oct 11 16:48:45 2012
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-200: remove hadoop RPC and keep just netty (apresta)
+
   GIRAPH-363: Fix hadoop_0.23 profile broken by GIRAPH-211 (ekoontz)
 
   GIRAPH-211: Add secure authentication to Netty IPC (ekoontz)

Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java Thu Oct 11 16:48:45 2012
@@ -237,7 +237,7 @@ public abstract class HCatalogVertexInpu
                     BspUtils.createVertex(getContext()
                             .getConfiguration());
       vertex.initialize(getVertexId(record), getVertexValue(record),
-                    getEdges(record), null);
+                    getEdges(record));
       ++recordCount;
       if ((recordCount % recordModLimit) == 0) {
         log.info("read " + recordCount + " records");
@@ -373,9 +373,8 @@ public abstract class HCatalogVertexInpu
     private void createCurrentVertex() {
       vertex = BspUtils.
               createVertex(getContext().getConfiguration());
-      vertex.initialize(currentVertexId,
-              getVertexValue(
-                      recordsForVertex), destEdgeMap, null);
+      vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
+          destEdgeMap);
       destEdgeMap.clear();
       recordsForVertex.clear();
       ++recordCount;

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java Thu Oct 11 16:48:45 2012
@@ -87,7 +87,7 @@ public class AccumuloEdgeInputFormat
               String edge = new String(value.get());
               Text edgeId = new Text(edge);
               edges.put(edgeId, uselessEdgeValue);
-              vertex.initialize(vertexId, new Text(), edges, null);
+              vertex.initialize(vertexId, new Text(), edges);
 
             return vertex;
         }

Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java Thu Oct 11 16:48:45 2012
@@ -93,7 +93,7 @@ public class TableEdgeInputFormat extend
             Text vertexValue = new Text();
             Text edgeId = new Text(edge);
             edges.put(edgeId, uselessEdgeValue);
-            vertex.initialize(vertexId, vertexValue, edges, null);
+            vertex.initialize(vertexId, vertexValue, edges);
 
             return vertex;
         }

Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Thu Oct 11 16:48:45 2012
@@ -558,7 +558,7 @@ under the License.
             </executions>
 	    <!-- profile: hadoop_0.20.203 -->
             <configuration>
-              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
+              <symbols>HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
             </configuration>
           </plugin>
         </plugins>
@@ -608,7 +608,7 @@ under the License.
             </executions>
 	    <!-- profile: hadoop_1.0 -->
             <configuration>
-              <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
+              <symbols>HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
             </configuration>
           </plugin>
         </plugins>
@@ -657,17 +657,17 @@ under the License.
             </executions>
             <configuration>
 	      <!-- profile: hadoop_non_secure -->
-              <symbols>HADOOP_NON_SECURE,HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
-	      <excludes>**/comm/SecureRPCCommunications.java,
-                        **/comm/netty/SaslNettyClient.java,
-                        **/comm/netty/SaslNettyServer.java,
-                        **/comm/netty/handler/AuthorizeServerHandler.java,
-                        **/comm/netty/handler/SaslClientHandler.java,
-                        **/comm/netty/handler/SaslServerHandler.java,
-                        **/comm/requests/SaslCompleteRequest.java,
-                        **/comm/requests/SaslTokenMessageRequest.java,
-                        **/comm/SaslConnectionTest.java,
-                        **/hadoop/BspTokenSelector.java</excludes>
+              <symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+	      <excludes>
+              **/comm/netty/SaslNettyClient.java,
+              **/comm/netty/SaslNettyServer.java,
+              **/comm/netty/handler/AuthorizeServerHandler.java,
+              **/comm/netty/handler/SaslClientHandler.java,
+              **/comm/netty/handler/SaslServerHandler.java,
+              **/comm/requests/SaslCompleteRequest.java,
+              **/comm/requests/SaslTokenMessageRequest.java,
+              **/comm/SaslConnectionTest.java
+          </excludes>
             </configuration>
           </plugin>
           <plugin>
@@ -720,10 +720,6 @@ under the License.
         <resources>
           <resource>
             <directory>${basedir}/src/main/java/org/apache/giraph/hadoop</directory>
-            <excludes>
-              <exclude>BspTokenSelector.java</exclude>
-	      <exclude>SecureRPCCommunications.java</exclude>
-	    </excludes>
 	  </resource>
 	  <resource>
             <directory>${basedir}/src/main/java/org/apache/giraph/comm/netty</directory>
@@ -735,10 +731,6 @@ under the License.
             <artifactId>maven-compiler-plugin</artifactId>
             <version>${maven-compiler-plugin.version}</version>
             <configuration>
-              <excludes>
-                <exclude>**/BspTokenSelector.java</exclude>
-		<exclude>**/SecureRPCCommunications.java</exclude>
-              </excludes>
               <source>${compileSource}</source>
               <target>${compileSource}</target>
             </configuration>
@@ -759,16 +751,16 @@ under the License.
 	    <!-- profile: hadoop_facebook -->
             <configuration>
               <symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
-	      <excludes>**/comm/SecureRPCCommunications.java,
-                        **/comm/netty/SaslNettyClient.java,
-                        **/comm/netty/SaslNettyServer.java,
-                        **/comm/netty/handler/AuthorizeServerHandler.java,
-                        **/comm/netty/handler/SaslClientHandler.java,
-                        **/comm/netty/handler/SaslServerHandler.java,
-                        **/comm/requests/SaslCompleteRequest.java,
-                        **/comm/requests/SaslTokenMessageRequest.java,
-                        **/comm/SaslConnectionTest.java,
-                        **/hadoop/BspTokenSelector.java</excludes>
+	      <excludes>
+              **/comm/netty/SaslNettyClient.java,
+              **/comm/netty/SaslNettyServer.java,
+              **/comm/netty/handler/AuthorizeServerHandler.java,
+              **/comm/netty/handler/SaslClientHandler.java,
+              **/comm/netty/handler/SaslServerHandler.java,
+              **/comm/requests/SaslCompleteRequest.java,
+              **/comm/requests/SaslTokenMessageRequest.java,
+              **/comm/SaslConnectionTest.java
+          </excludes>
             </configuration>
           </plugin>
           <plugin>

Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Thu Oct 11 16:48:45 2012
@@ -156,11 +156,6 @@ public class GiraphConfiguration extends
   /** Local ZooKeeper directory to use */
   public static final String ZOOKEEPER_DIR = "giraph.zkDir";
 
-  /** Use the RPC communication or netty communication */
-  public static final String USE_NETTY = "giraph.useNetty";
-  /** Default is to use RPC, not netty */
-  public static final boolean USE_NETTY_DEFAULT = false;
-
   /** TCP backlog (defaults to number of workers) */
   public static final String TCP_BACKLOG = "giraph.tcpBacklog";
   /**
@@ -278,29 +273,24 @@ public class GiraphConfiguration extends
   /** Default Netty max connection failures */
   public static final int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
 
-  /** Initial port to start using for the RPC communication */
-  public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
-  /** Default port to start using for the RPC communication */
-  public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
-
-  /** Maximum bind attempts for different RPC ports */
-  public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
-      "giraph.maxRpcPortBindAttempts";
-  /** Default maximum bind attempts for different RPC ports */
-  public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+  /** Initial port to start using for the IPC communication */
+  public static final String IPC_INITIAL_PORT = "giraph.ipcInitialPort";
+  /** Default port to start using for the IPC communication */
+  public static final int IPC_INITIAL_PORT_DEFAULT = 30000;
+
+  /** Maximum bind attempts for different IPC ports */
+  public static final String MAX_IPC_PORT_BIND_ATTEMPTS =
+      "giraph.maxIpcPortBindAttempts";
+  /** Default maximum bind attempts for different IPC ports */
+  public static final int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
   /**
-   * Fail first RPC port binding attempt, simulate binding failure
+   * Fail first IPC port binding attempt, simulate binding failure
    * on real grid testing
    */
-  public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT =
-      "giraph.failFirstRpcPortBindAttempt";
-  /** Default fail first RPC port binding attempt flag */
-  public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false;
-
-  /** Maximum number of RPC handlers */
-  public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
-  /** Default maximum number of RPC handlers */
-  public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
+  public static final String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
+      "giraph.failFirstIpcPortBindAttempt";
+  /** Default fail first IPC port binding attempt flag */
+  public static final boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false;
 
   /** Client send buffer size */
   public static final String CLIENT_SEND_BUFFER_SIZE =
@@ -762,10 +752,6 @@ public class GiraphConfiguration extends
     return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
   }
 
-  public boolean getUseNetty() {
-    return getBoolean(USE_NETTY, USE_NETTY_DEFAULT);
-  }
-
   public int getZooKeeperServerCount() {
     return getInt(ZOOKEEPER_SERVER_COUNT,
         ZOOKEEPER_SERVER_COUNT_DEFAULT);

Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Oct 11 16:48:45 2012
@@ -167,15 +167,6 @@ public interface CentralizedServiceWorke
       Collection<? extends PartitionOwner> masterSetPartitionOwners);
 
   /**
-   * Assign messages to a vertex (bypasses package-private access to
-   * setMessages() for internal classes).
-   *
-   * @param vertex Vertex (owned by worker)
-   * @param messages Messages to assign to the vertex
-   */
-  void assignMessagesToVertex(Vertex<I, V, E, M> vertex, Iterable<M> messages);
-
-  /**
    * Get master info
    *
    * @return Master info

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Thu Oct 11 16:48:45 2012
@@ -44,7 +44,7 @@ public interface WorkerServer<I extends 
 
   /**
    * Move the in transition messages to the in messages for every vertex and
-   * add new connections to any newly appearing RPC proxies.
+   * add new connections to any newly appearing IPC proxies.
    */
   void prepareSuperstep();
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Thu Oct 11 16:48:45 2012
@@ -148,8 +148,11 @@ public class SimpleMessageStore<I extend
   public Collection<M> getVertexMessages(I vertexId) throws IOException {
     ConcurrentMap<I, Collection<M>> partitionMap =
         map.get(getPartitionId(vertexId));
-    return (partitionMap == null) ? Collections.<M>emptyList() :
-        partitionMap.get(vertexId);
+    if (partitionMap == null) {
+      return Collections.<M>emptyList();
+    }
+    Collection<M> messages = partitionMap.get(vertexId);
+    return (messages == null) ? Collections.<M>emptyList() : messages;
   }
 
   @Override

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java Thu Oct 11 16:48:45 2012
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Package of communication related objects, RPC service.
+ * Package of communication related objects, IPC service.
  */
 package org.apache.giraph.comm.messages;

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Thu Oct 11 16:48:45 2012
@@ -279,27 +279,27 @@ else[HADOOP_NON_SECURE]*/
     int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 1;
     int portIncrementConstant =
         (int) Math.pow(10, Math.ceil(Math.log10(numServers)));
-    int bindPort = conf.getInt(GiraphConfiguration.RPC_INITIAL_PORT,
-        GiraphConfiguration.RPC_INITIAL_PORT_DEFAULT) +
+    int bindPort = conf.getInt(GiraphConfiguration.IPC_INITIAL_PORT,
+        GiraphConfiguration.IPC_INITIAL_PORT_DEFAULT) +
         taskId;
     int bindAttempts = 0;
-    final int maxRpcPortBindAttempts =
-        conf.getInt(GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS,
-            GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+    final int maxIpcPortBindAttempts =
+        conf.getInt(GiraphConfiguration.MAX_IPC_PORT_BIND_ATTEMPTS,
+            GiraphConfiguration.MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT);
     final boolean failFirstPortBindingAttempt =
-        conf.getBoolean(GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
-            GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
+        conf.getBoolean(GiraphConfiguration.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT,
+            GiraphConfiguration.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT);
 
     // Simple handling of port collisions on the same machine while
     // preserving debugability from the port number alone.
     // Round up the max number of workers to the next power of 10 and use
     // it as a constant to increase the port number with.
-    while (bindAttempts < maxRpcPortBindAttempts) {
+    while (bindAttempts < maxIpcPortBindAttempts) {
       this.myAddress = new InetSocketAddress(localHostname, bindPort);
       if (failFirstPortBindingAttempt && bindAttempts == 0) {
         if (LOG.isInfoEnabled()) {
           LOG.info("start: Intentionally fail first " +
-              "binding attempt as giraph.failFirstRpcPortBindAttempt " +
+              "binding attempt as giraph.failFirstIpcPortBindAttempt " +
               "is true, port " + bindPort);
         }
         ++bindAttempts;
@@ -319,7 +319,7 @@ else[HADOOP_NON_SECURE]*/
         bindPort += portIncrementConstant;
       }
     }
-    if (bindAttempts == maxRpcPortBindAttempts || myAddress == null) {
+    if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
       throw new IllegalStateException(
           "start: Failed to start NettyServer with " +
               bindAttempts + " attempts");

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Thu Oct 11 16:48:45 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.comm.netty;
 
-import java.util.Iterator;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -49,6 +48,7 @@ import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -134,7 +134,7 @@ public class NettyWorkerClient<I extends
   @Override
   public void fixPartitionIdToSocketAddrMap() {
     // 1. Fix all the cached inet addresses (remove all changed entries)
-    // 2. Connect to any new RPC servers
+    // 2. Connect to any new IPC servers
     Map<InetSocketAddress, Integer> addressTaskIdMap =
         Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
     for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
@@ -165,12 +165,8 @@ public class NettyWorkerClient<I extends
             partitionOwner.getWorkerInfo().getTaskId());
       }
     }
-    boolean useNetty = conf.getBoolean(GiraphConfiguration.USE_NETTY,
-        GiraphConfiguration.USE_NETTY_DEFAULT);
-    if (useNetty) {
-      addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
-                           null);
-    }
+    addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
+        null);
     nettyClient.connectAllAddresses(addressTaskIdMap);
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java Thu Oct 11 16:48:45 2012
@@ -16,6 +16,6 @@
  * limitations under the License.
  */
 /**
- * Package of communication related objects, RPC service.
+ * Package of communication related objects, IPC service.
  */
 package org.apache.giraph.comm;

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java Thu Oct 11 16:48:45 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -78,8 +77,7 @@ public class LongDoubleFloatDoubleTextIn
       }
 
       LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
-      vertex.initialize(vertexId, new DoubleWritable(), edges,
-          Lists.<DoubleWritable>newArrayList());
+      vertex.initialize(vertexId, new DoubleWritable(), edges);
 
       return vertex;
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java Thu Oct 11 16:48:45 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -78,8 +77,7 @@ public class NormalizingLongDoubleFloatD
       normalize(edges);
 
       LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
-      vertex.initialize(vertexId, new DoubleWritable(), edges,
-          Lists.<DoubleWritable>newArrayList());
+      vertex.initialize(vertexId, new DoubleWritable(), edges);
 
       return vertex;
     }

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Thu Oct 11 16:48:45 2012
@@ -87,7 +87,7 @@ public class SimpleMutateGraphVertex ext
           new LongWritable(rangeVertexIdStart(3) + getId().get());
       Vertex<LongWritable, DoubleWritable,
             FloatWritable, DoubleWritable> vertex =
-        instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null);
+        instantiateVertex(vertexIndex, new DoubleWritable(0.0), null);
       addVertexRequest(vertex);
       // Add edges to those remote vertices as well
       addEdgeRequest(vertexIndex,

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Oct 11 16:48:45 2012
@@ -195,7 +195,7 @@ public class SimplePageRankVertex extend
       float edgeValue = vertexId.get() * 100f;
       Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
       edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue));
-      vertex.initialize(vertexId, vertexValue, edges, null);
+      vertex.initialize(vertexId, vertexValue, edges);
       ++recordsRead;
       if (LOG.isInfoEnabled()) {
         LOG.info("next: Return vertexId=" + vertex.getId().get() +

Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Oct 11 16:48:45 2012
@@ -84,7 +84,7 @@ public class SimpleSuperstepVertex exten
       float edgeValue = vertexId.get() * 100f;
       edgeMap.put(new LongWritable(targetVertexId),
           new FloatWritable(edgeValue));
-      vertex.initialize(vertexId, vertexValue, edgeMap, null);
+      vertex.initialize(vertexId, vertexValue, edgeMap);
       ++recordsRead;
       if (LOG.isInfoEnabled()) {
         LOG.info("next: Return vertexId=" + vertex.getId().get() +

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Oct 11 16:48:45 2012
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.graph;
 
-import com.google.common.collect.Sets;
 import org.apache.giraph.GiraphConfiguration;
 import org.apache.giraph.bsp.ApplicationState;
 import org.apache.giraph.bsp.BspInputFormat;
@@ -56,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.json.JSONException;
 import org.json.JSONObject;
 
+import com.google.common.collect.Sets;
 import net.iharder.Base64;
 
 import java.io.ByteArrayOutputStream;
@@ -762,15 +762,13 @@ public class BspServiceMaster<I extends 
           aggregatorHandler = new MasterAggregatorHandler(getConfiguration());
           aggregatorHandler.initialize(this);
 
-          if (getConfiguration().getUseNetty()) {
-            commService = new NettyMasterClientServer(
-                getContext(), getConfiguration());
-            masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
-                commService.getMyAddress().getPort());
-            // write my address to znode so workers could read it
-            WritableUtils.writeToZnode(getZkExt(), currentMasterPath, -1,
-                masterInfo);
-          }
+          commService = new NettyMasterClientServer(
+              getContext(), getConfiguration());
+          masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
+              commService.getMyAddress().getPort());
+          // write my address to znode so workers could read it
+          WritableUtils.writeToZnode(getZkExt(), currentMasterPath, -1,
+              masterInfo);
 
           if (LOG.isInfoEnabled()) {
             LOG.info("becomeMaster: I am now the master!");
@@ -1273,9 +1271,7 @@ public class BspServiceMaster<I extends 
       }
     }
 
-    if (getConfiguration().getUseNetty()) {
-      commService.fixWorkerAddresses(chosenWorkerInfoList);
-    }
+    commService.fixWorkerAddresses(chosenWorkerInfoList);
 
     currentWorkersCounter.increment(chosenWorkerInfoList.size() -
         currentWorkersCounter.getValue());
@@ -1549,10 +1545,8 @@ public class BspServiceMaster<I extends 
       }
       aggregatorHandler.close();
 
-      if (getConfiguration().getUseNetty()) {
-        commService.closeConnections();
-        commService.close();
-      }
+      commService.closeConnections();
+      commService.close();
     }
 
     try {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Oct 11 16:48:45 2012
@@ -24,17 +24,11 @@ import org.apache.giraph.bsp.Centralized
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClientServer;
 import org.apache.giraph.comm.netty.NettyWorkerClientServer;
-/*if[HADOOP_NON_SECURE]
-import org.apache.giraph.comm.RPCCommunications;
-else[HADOOP_NON_SECURE]*/
-import org.apache.giraph.comm.SecureRPCCommunications;
-/*end[HADOOP_NON_SECURE]*/
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionExchange;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.PartitionStore;
-import org.apache.giraph.graph.partition.SimplePartitionStore;
 import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.WritableUtils;
@@ -121,10 +115,6 @@ public class BspServiceWorker<I extends 
   /** Input split max vertices (-1 denotes all) */
   private final long inputSplitMaxVertices;
   /**
-   * Partition store for worker (only used by the Hadoop RPC implementation).
-   */
-  private final PartitionStore<I, V, E, M> workerPartitionStore;
-  /**
    * Stores and processes the list of InputSplits advertised
    * in a tree of child znodes by the master.
    */
@@ -162,27 +152,14 @@ public class BspServiceWorker<I extends 
             GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
     workerGraphPartitioner =
         getGraphPartitionerFactory().createWorkerGraphPartitioner();
-    boolean useNetty = getConfiguration().getUseNetty();
-    if (useNetty) {
-      commService =  new NettyWorkerClientServer<I, V, E, M>(
-          context, getConfiguration(), this);
-    } else {
-/*if[HADOOP_NON_SECURE]
-      commService =
-          new RPCCommunications<I, V, E, M>(context, this, getConfiguration(),
-          graphState);
-else[HADOOP_NON_SECURE]*/
-      commService =
-        new SecureRPCCommunications<I, V, E, M>(context, this,
-          getConfiguration(), graphState);
-/*end[HADOOP_NON_SECURE]*/
-    }
+    commService =  new NettyWorkerClientServer<I, V, E, M>(
+        context, getConfiguration(), this);
+
     if (LOG.isInfoEnabled()) {
       LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
           transferRegulator.getMaxVerticesPerTransfer());
       LOG.info("BspServiceWorker: maxEdgesPerTransfer = " +
-          transferRegulator.getMaxEdgesPerTransfer() +
-          " useNetty = " + useNetty);
+          transferRegulator.getMaxEdgesPerTransfer());
     }
 
     workerInfo = new WorkerInfo(
@@ -192,14 +169,6 @@ else[HADOOP_NON_SECURE]*/
     this.workerContext =
         getConfiguration().createWorkerContext(graphMapper.getGraphState());
 
-    if (useNetty) {
-      workerPartitionStore = null;
-    } else {
-      workerPartitionStore =
-          new SimplePartitionStore<I, V, E, M>(getConfiguration(),
-              getContext());
-    }
-
     aggregatorHandler = new WorkerAggregatorHandler();
   }
 
@@ -542,12 +511,6 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public void assignMessagesToVertex(Vertex<I, V, E, M> vertex,
-      Iterable<M> messages) {
-    vertex.putMessages(messages);
-  }
-
-  @Override
   public WorkerInfo getMasterInfo() {
     return masterInfo;
   }
@@ -840,12 +803,9 @@ else[HADOOP_NON_SECURE]*/
           "startSuperstep: InterruptedException getting assignments", e);
     }
 
-
-    if (getConfiguration().getUseNetty()) {
-      // get address of master
-      WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
-          null, masterInfo);
-    }
+    // get address of master
+    WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
+        null, masterInfo);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("startSuperstep: Ready for computation on superstep " +
@@ -1097,7 +1057,6 @@ else[HADOOP_NON_SECURE]*/
       LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
     }
 
-    boolean useNetty = getConfiguration().getUseNetty();
     FSDataOutputStream verticesOutputStream =
         getFs().create(verticesFilePath);
     ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
@@ -1107,11 +1066,8 @@ else[HADOOP_NON_SECURE]*/
       long startPos = verticesOutputStream.getPos();
       partition.write(verticesOutputStream);
       // write messages
-      verticesOutputStream.writeBoolean(useNetty);
-      if (useNetty) {
-        getServerData().getCurrentMessageStore().writePartition(
-            verticesOutputStream, partition.getId());
-      }
+      getServerData().getCurrentMessageStore().writePartition(
+          verticesOutputStream, partition.getId());
       // Write the metadata for this partition
       // Format:
       // <index count>
@@ -1166,16 +1122,13 @@ else[HADOOP_NON_SECURE]*/
 
   @Override
   public void loadCheckpoint(long superstep) {
-    if (getConfiguration().getBoolean(GiraphConfiguration.USE_NETTY,
-        GiraphConfiguration.USE_NETTY_DEFAULT)) {
-      try {
-        // clear old message stores
-        getServerData().getIncomingMessageStore().clearAll();
-        getServerData().getCurrentMessageStore().clearAll();
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "loadCheckpoint: Failed to clear message stores ", e);
-      }
+    try {
+      // clear old message stores
+      getServerData().getIncomingMessageStore().clearAll();
+      getServerData().getCurrentMessageStore().clearAll();
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "loadCheckpoint: Failed to clear message stores ", e);
     }
 
     // Algorithm:
@@ -1462,11 +1415,7 @@ else[HADOOP_NON_SECURE]*/
 
   @Override
   public PartitionStore<I, V, E, M> getPartitionStore() {
-    if (workerPartitionStore != null) {
-      return workerPartitionStore;
-    } else {
-      return getServerData().getPartitionStore();
-    }
+    return getServerData().getPartitionStore();
   }
 
   @Override

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Oct 11 16:48:45 2012
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.io.DataInput;
@@ -50,20 +49,14 @@ public abstract class EdgeListVertex<I e
   private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
   /** List of edges */
   private List<Edge<I, E>> edgeList = Lists.newArrayList();
-  /** List of incoming messages from the previous superstep */
-  private List<M> messageList = Lists.newArrayList();
 
   @Override
-  public void initialize(I id, V value, Map<I, E> edges, Iterable<M> messages) {
-    super.initialize(id, value);
+  public void setEdges(Map<I, E> edges) {
     if (edges != null) {
       for (Map.Entry<I, E> edge : edges.entrySet()) {
         edgeList.add(new Edge<I, E>(edge.getKey(), edge.getValue()));
       }
     }
-    if (messages != null) {
-      Iterables.<M>addAll(messageList, messages);
-    }
   }
 
   @Override
@@ -104,28 +97,12 @@ public abstract class EdgeListVertex<I e
   }
 
   @Override
-  void putMessages(Iterable<M> messages) {
-    messageList.clear();
-    Iterables.addAll(messageList, messages);
-  }
-
-  @Override
-  public Iterable<M> getMessages() {
-    return Iterables.unmodifiableIterable(messageList);
-  }
-
-  @Override
-  public int getNumMessages() {
-    return messageList.size();
-  }
-
-  @Override
   public final void readFields(DataInput in) throws IOException {
     I vertexId = getConf().createVertexId();
     vertexId.readFields(in);
     V vertexValue = getConf().createVertexValue();
     vertexValue.readFields(in);
-    super.initialize(vertexId, vertexValue);
+    initialize(vertexId, vertexValue);
 
     int numEdges = in.readInt();
     edgeList = Lists.newArrayListWithCapacity(numEdges);
@@ -137,14 +114,6 @@ public abstract class EdgeListVertex<I e
       edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
     }
 
-    int numMessages = in.readInt();
-    messageList = Lists.newArrayListWithCapacity(numMessages);
-    for (int i = 0; i < numMessages; ++i) {
-      M message = getConf().createMessageValue();
-      message.readFields(in);
-      messageList.add(message);
-    }
-
     boolean halt = in.readBoolean();
     if (halt) {
       voteToHalt();
@@ -164,18 +133,8 @@ public abstract class EdgeListVertex<I e
       edge.getValue().write(out);
     }
 
-    out.writeInt(messageList.size());
-    for (M message : messageList) {
-      message.write(out);
-    }
-
     out.writeBoolean(isHalted());
   }
 
-  @Override
-  void releaseResources() {
-    // Hint to GC to free the messages
-    messageList.clear();
-  }
 }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Oct 11 16:48:45 2012
@@ -39,8 +39,6 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 
-import com.google.common.collect.Iterables;
-
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.net.URL;
@@ -471,11 +469,8 @@ public class GraphMapper<I extends Writa
       serviceWorker.getWorkerContext().preSuperstep();
       context.progress();
 
-      boolean useNetty = conf.getUseNetty();
-      MessageStoreByPartition<I, M> messageStore = null;
-      if (useNetty) {
-        messageStore = serviceWorker.getServerData().getCurrentMessageStore();
-      }
+      MessageStoreByPartition<I, M> messageStore =
+          serviceWorker.getServerData().getCurrentMessageStore();
 
       partitionStatsList.clear();
       TimedLogger partitionLogger = new TimedLogger(15000, LOG);
@@ -489,27 +484,16 @@ public class GraphMapper<I extends Writa
           // graphState before computing
           vertex.setGraphState(graphState);
 
-          Collection<M> messages = null;
-          if (useNetty) {
-            messages = messageStore.getVertexMessages(vertex.getId());
-            messageStore.clearVertexMessages(vertex.getId());
-          }
+          Collection<M> messages =
+              messageStore.getVertexMessages(vertex.getId());
+          messageStore.clearVertexMessages(vertex.getId());
 
-          boolean hasMessages = (messages != null && !messages.isEmpty()) ||
-              !Iterables.isEmpty(vertex.getMessages());
-          if (vertex.isHalted() && hasMessages) {
+          if (vertex.isHalted() && !messages.isEmpty()) {
             vertex.wakeUp();
           }
           if (!vertex.isHalted()) {
-            Iterable<M> vertexMsgIt;
-            if (messages == null) {
-              vertexMsgIt = vertex.getMessages();
-            } else {
-              vertexMsgIt = messages;
-            }
             context.progress();
-            vertex.compute(vertexMsgIt);
-            vertex.releaseResources();
+            vertex.compute(messages);
           }
           if (vertex.isHalted()) {
             partitionStats.incrFinishedVertexCount();
@@ -518,9 +502,7 @@ public class GraphMapper<I extends Writa
           partitionStats.addEdgeCount(vertex.getNumEdges());
         }
 
-        if (useNetty) {
-          messageStore.clearPartition(partition.getId());
-        }
+        messageStore.clearPartition(partition.getId());
 
         partitionStatsList.add(partitionStats);
         ++completedPartitions;

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Thu Oct 11 16:48:45 2012
@@ -24,14 +24,12 @@ import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -56,17 +54,10 @@ public abstract class HashMapVertex<I ex
   private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
   /** Map of target vertices and their edge values */
   protected Map<I, E> edgeMap = new HashMap<I, E>();
-  /** List of incoming messages from the previous superstep */
-  private List<M> messageList = Lists.newArrayList();
 
   @Override
-  public void initialize(
-      I id, V value, Map<I, E> edges, Iterable<M> messages) {
-    super.initialize(id, value);
+  public void setEdges(Map<I, E> edges) {
     edgeMap.putAll(edges);
-    if (messages != null) {
-      Iterables.<M>addAll(messageList, messages);
-    }
   }
 
   @Override
@@ -129,28 +120,12 @@ public abstract class HashMapVertex<I ex
   }
 
   @Override
-  void putMessages(Iterable<M> messages) {
-    messageList.clear();
-    Iterables.addAll(messageList, messages);
-  }
-
-  @Override
-  public Iterable<M> getMessages() {
-    return Iterables.unmodifiableIterable(messageList);
-  }
-
-  @Override
-  public int getNumMessages() {
-    return messageList.size();
-  }
-
-  @Override
   public final void readFields(DataInput in) throws IOException {
     I vertexId = getConf().createVertexId();
     vertexId.readFields(in);
     V vertexValue = getConf().createVertexValue();
     vertexValue.readFields(in);
-    super.initialize(vertexId, vertexValue);
+    initialize(vertexId, vertexValue);
 
     int numEdges = in.readInt();
     edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
@@ -162,14 +137,6 @@ public abstract class HashMapVertex<I ex
       edgeMap.put(targetVertexId, edgeValue);
     }
 
-    int numMessages = in.readInt();
-    messageList = Lists.newArrayListWithCapacity(numMessages);
-    for (int i = 0; i < numMessages; ++i) {
-      M message = getConf().createMessageValue();
-      message.readFields(in);
-      messageList.add(message);
-    }
-
     boolean halt = in.readBoolean();
     if (halt) {
       voteToHalt();
@@ -189,18 +156,8 @@ public abstract class HashMapVertex<I ex
       edge.getValue().write(out);
     }
 
-    out.writeInt(messageList.size());
-    for (M message : messageList) {
-      message.write(out);
-    }
-
     out.writeBoolean(isHalted());
   }
 
-  @Override
-  void releaseResources() {
-    // Hint to GC to free the messages
-    messageList.clear();
-  }
 }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java Thu Oct 11 16:48:45 2012
@@ -20,15 +20,12 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.Iterables;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
+import java.util.Set;
 
 /**
  * Simple implementation of {@link Vertex} using an int as id, value and
@@ -39,26 +36,14 @@ public abstract class IntIntNullIntVerte
     SimpleVertex<IntWritable, IntWritable, IntWritable> {
   /** Int array of neighbor vertex ids */
   private int[] neighbors;
-  /** Int array of messages */
-  private int[] messages;
 
   @Override
-  public void initialize(IntWritable id, IntWritable value,
-                         Map<IntWritable, NullWritable> edges,
-                         Iterable<IntWritable> messages) {
-    super.initialize(id, value);
-    this.neighbors = new int[(edges != null) ? edges.size() : 0];
+  public void setNeighbors(Set<IntWritable> neighborSet) {
+    neighbors = new int[(neighborSet != null) ? neighborSet.size() : 0];
     int n = 0;
-    if (edges != null) {
-      for (Map.Entry<IntWritable, NullWritable> edge : edges.entrySet()) {
-        this.neighbors[n++] = edge.getKey().get();
-      }
-    }
-    this.messages = new int[(messages != null) ? Iterables.size(messages) : 0];
-    if (messages != null) {
-      n = 0;
-      for (IntWritable message : messages) {
-        this.messages[n++] = message.get();
+    if (neighborSet != null) {
+      for (IntWritable neighbor : neighborSet) {
+        neighbors[n++] = neighbor.get();
       }
     }
   }
@@ -89,35 +74,6 @@ public abstract class IntIntNullIntVerte
   }
 
   @Override
-  public Iterable<IntWritable> getMessages() {
-    return new Iterable<IntWritable>() {
-      @Override
-      public Iterator<IntWritable> iterator() {
-        return new UnmodifiableIntArrayIterator(messages);
-      }
-    };
-  }
-
-  @Override
-  public void putMessages(Iterable<IntWritable> newMessages) {
-    messages = new int[Iterables.size(newMessages)];
-    int n = 0;
-    for (IntWritable message : newMessages) {
-      messages[n++] = message.get();
-    }
-  }
-
-  @Override
-  public int getNumMessages() {
-    return messages.length;
-  }
-
-  @Override
-  void releaseResources() {
-    messages = new int[0];
-  }
-
-  @Override
   public void write(final DataOutput out) throws IOException {
     out.writeInt(getId().get());
     out.writeInt(getValue().get());
@@ -125,10 +81,6 @@ public abstract class IntIntNullIntVerte
     for (int n = 0; n < neighbors.length; n++) {
       out.writeInt(neighbors[n]);
     }
-    out.writeInt(messages.length);
-    for (int n = 0; n < messages.length; n++) {
-      out.writeInt(messages[n]);
-    }
     out.writeBoolean(isHalted());
   }
 
@@ -136,17 +88,12 @@ public abstract class IntIntNullIntVerte
   public void readFields(DataInput in) throws IOException {
     int id = in.readInt();
     int value = in.readInt();
-    super.initialize(new IntWritable(id), new IntWritable(value));
+    initialize(new IntWritable(id), new IntWritable(value));
     int numEdges = in.readInt();
     neighbors = new int[numEdges];
     for (int n = 0; n < numEdges; n++) {
       neighbors[n] = in.readInt();
     }
-    int numMessages = in.readInt();
-    messages = new int[numMessages];
-    for (int n = 0; n < numMessages; n++) {
-      messages[n] = in.readInt();
-    }
     boolean halt = in.readBoolean();
     if (halt) {
       voteToHalt();

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java Thu Oct 11 16:48:45 2012
@@ -21,24 +21,19 @@ package org.apache.giraph.graph;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
 import org.apache.giraph.utils.UnmodifiableLongFloatEdgeArrayIterable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 
-import com.google.common.collect.Iterables;
-
 /**
  * Compact vertex representation with primitive arrays.
  */
 public abstract class LongDoubleFloatDoubleEdgeListVertex extends
     Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
-
   /** long represented vertex id */
   private long id;
   /** double represented vertex value */
@@ -47,13 +42,10 @@ public abstract class LongDoubleFloatDou
   private long[] neighbors;
   /** float array of edge weights */
   private float[] edgeWeights;
-  /** double array of messages */
-  private double[] messages;
 
   @Override
   public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
-          Map<LongWritable, FloatWritable> edges,
-          Iterable<DoubleWritable> messages) {
+                         Map<LongWritable, FloatWritable> edges) {
     if (vertexId != null) {
       id = vertexId.get();
     }
@@ -70,12 +62,18 @@ public abstract class LongDoubleFloatDou
         n++;
       }
     }
-    this.messages =
-        new double[(messages != null) ? Iterables.size(messages) : 0];
-    if (messages != null) {
+  }
+
+  @Override
+  public void setEdges(Map<LongWritable, FloatWritable> edges) {
+    neighbors = new long[(edges != null) ? edges.size() : 0];
+    edgeWeights = new float[(edges != null) ? edges.size() : 0];
+    if (edges != null) {
       int n = 0;
-      for (DoubleWritable message : messages) {
-        this.messages[n++] = message.get();
+      for (Entry<LongWritable, FloatWritable> neighbor : edges.entrySet()) {
+        neighbors[n] = neighbor.getKey().get();
+        edgeWeights[n] = neighbor.getValue().get();
+        n++;
       }
     }
   }
@@ -135,30 +133,6 @@ public abstract class LongDoubleFloatDou
   }
 
   @Override
-  public Iterable<DoubleWritable> getMessages() {
-    return new Iterable<DoubleWritable>() {
-      @Override
-      public Iterator<DoubleWritable> iterator() {
-        return new UnmodifiableDoubleArrayIterator(messages);
-      }
-    };
-  }
-
-  @Override
-  public void putMessages(Iterable<DoubleWritable> newMessages) {
-    messages = new double[Iterables.size(newMessages)];
-    int n = 0;
-    for (DoubleWritable message : newMessages) {
-      messages[n++] = message.get();
-    }
-  }
-
-  @Override
-  void releaseResources() {
-    messages = new double[0];
-  }
-
-  @Override
   public void write(final DataOutput out) throws IOException {
     out.writeLong(id);
     out.writeDouble(value);
@@ -169,10 +143,6 @@ public abstract class LongDoubleFloatDou
     for (int n = 0; n < edgeWeights.length; n++) {
       out.writeFloat(edgeWeights[n]);
     }
-    out.writeInt(messages.length);
-    for (int n = 0; n < messages.length; n++) {
-      out.writeDouble(messages[n]);
-    }
   }
 
   @Override
@@ -188,11 +158,6 @@ public abstract class LongDoubleFloatDou
     for (int n = 0; n < numEdges; n++) {
       edgeWeights[n] = in.readFloat();
     }
-    int numMessages = in.readInt();
-    messages = new double[numMessages];
-    for (int n = 0; n < numMessages; n++) {
-      messages[n] = in.readDouble();
-    }
   }
 
   @Override
@@ -204,11 +169,6 @@ public abstract class LongDoubleFloatDou
   }
 
   @Override
-  public int getNumMessages() {
-    return messages.length;
-  }
-
-  @Override
   public boolean equals(Object obj) {
     if (this == obj) {
       return true;

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Oct 11 16:48:45 2012
@@ -21,7 +21,6 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.log4j.Logger;
-import org.apache.mahout.math.function.DoubleProcedure;
 import org.apache.mahout.math.function.LongFloatProcedure;
 import org.apache.mahout.math.list.DoubleArrayList;
 import org.apache.mahout.math.map.OpenLongFloatHashMap;
@@ -47,24 +46,13 @@ public abstract class LongDoubleFloatDou
   /** Stores the edges */
   private OpenLongFloatHashMap edgeMap =
       new OpenLongFloatHashMap();
-  /** Message list storage */
-  private DoubleArrayList messageList = new DoubleArrayList();
 
   @Override
-  public void initialize(LongWritable id, DoubleWritable value,
-                         Map<LongWritable, FloatWritable> edges,
-                         Iterable<DoubleWritable> messages) {
-    super.initialize(id, value);
+  public void setEdges(Map<LongWritable, FloatWritable> edges) {
     if (edges != null) {
       for (Map.Entry<LongWritable, FloatWritable> edge :
         edges.entrySet()) {
-        edgeMap.put(edge.getKey().get(),
-            edge.getValue().get());
-      }
-    }
-    if (messages != null) {
-      for (DoubleWritable m : messages) {
-        messageList.add(m.get());
+        edgeMap.put(edge.getKey().get(), edge.getValue().get());
       }
     }
   }
@@ -144,17 +132,13 @@ public abstract class LongDoubleFloatDou
   public final void readFields(DataInput in) throws IOException {
     long id = in.readLong();
     double value = in.readDouble();
-    super.initialize(new LongWritable(id), new DoubleWritable(value));
+    initialize(new LongWritable(id), new DoubleWritable(value));
     long edgeMapSize = in.readLong();
     for (long i = 0; i < edgeMapSize; ++i) {
       long targetVertexId = in.readLong();
       float edgeValue = in.readFloat();
       edgeMap.put(targetVertexId, edgeValue);
     }
-    long messageListSize = in.readLong();
-    for (long i = 0; i < messageListSize; ++i) {
-      messageList.add(in.readDouble());
-    }
     boolean halt = in.readBoolean();
     if (halt) {
       voteToHalt();
@@ -181,46 +165,9 @@ public abstract class LongDoubleFloatDou
         return true;
       }
     });
-    out.writeLong(messageList.size());
-    messageList.forEach(new DoubleProcedure() {
-      @Override
-      public boolean apply(double message) {
-        try {
-          out.writeDouble(message);
-        } catch (IOException e) {
-          throw new IllegalStateException(
-              "apply: IOException when not allowed", e);
-        }
-        return true;
-      }
-    });
     out.writeBoolean(isHalted());
   }
 
-  @Override
-  void putMessages(Iterable<DoubleWritable> messages) {
-    messageList.clear();
-    for (DoubleWritable message : messages) {
-      messageList.add(message.get());
-    }
-  }
-
-  @Override
-  void releaseResources() {
-    // Hint to GC to free the messages
-    messageList.clear();
-  }
-
-  @Override
-  public int getNumMessages() {
-    return messageList.size();
-  }
-
-  @Override
-  public Iterable<DoubleWritable> getMessages() {
-    return new UnmodifiableDoubleWritableIterable(messageList);
-  }
-
   /**
    * Helper iterable over the messages.
    */

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java Thu Oct 11 16:48:45 2012
@@ -21,36 +21,28 @@ package org.apache.giraph.graph;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
 import org.apache.giraph.utils.UnmodifiableLongNullEdgeArrayIterable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 
-import com.google.common.collect.Iterables;
-
 /**
  * Compact vertex representation with primitive arrays and null edges.
  */
 public abstract class LongDoubleNullDoubleVertex extends
     Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> {
-
   /** long represented vertex id */
   private long id;
   /** double represented vertex value */
   private double value;
   /** long array of neighbor vertex ids */
   private long[] neighbors;
-  /** double array of messages */
-  private double[] messages;
 
   @Override
   public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
-      Map<LongWritable, NullWritable> edges,
-      Iterable<DoubleWritable> messages) {
+                         Map<LongWritable, NullWritable> edges) {
     id = vertexId.get();
     value = vertexValue.get();
     neighbors = new long[(edges != null) ? edges.size() : 0];
@@ -60,14 +52,6 @@ public abstract class LongDoubleNullDoub
         neighbors[n++] = neighbor.get();
       }
     }
-    this.messages =
-        new double[(messages != null) ? Iterables.size(messages) : 0];
-    if (messages != null) {
-      n = 0;
-      for (DoubleWritable message : messages) {
-        this.messages[n++] = message.get();
-      }
-    }
   }
 
   @Override
@@ -118,30 +102,6 @@ public abstract class LongDoubleNullDoub
   }
 
   @Override
-  public Iterable<DoubleWritable> getMessages() {
-    return new Iterable<DoubleWritable>() {
-      @Override
-      public Iterator<DoubleWritable> iterator() {
-        return new UnmodifiableDoubleArrayIterator(messages);
-      }
-    };
-  }
-
-  @Override
-  public void putMessages(Iterable<DoubleWritable> newMessages) {
-    messages = new double[Iterables.size(newMessages)];
-    int n = 0;
-    for (DoubleWritable message : newMessages) {
-      messages[n++] = message.get();
-    }
-  }
-
-  @Override
-  void releaseResources() {
-    messages = new double[0];
-  }
-
-  @Override
   public void write(final DataOutput out) throws IOException {
     out.writeLong(id);
     out.writeDouble(value);
@@ -149,10 +109,6 @@ public abstract class LongDoubleNullDoub
     for (int n = 0; n < neighbors.length; n++) {
       out.writeLong(neighbors[n]);
     }
-    out.writeInt(messages.length);
-    for (int n = 0; n < messages.length; n++) {
-      out.writeDouble(messages[n]);
-    }
   }
 
   @Override
@@ -164,10 +120,5 @@ public abstract class LongDoubleNullDoub
     for (int n = 0; n < numEdges; n++) {
       neighbors[n] = in.readLong();
     }
-    int numMessages = in.readInt();
-    messages = new double[numMessages];
-    for (int n = 0; n < numMessages; n++) {
-      messages[n] = in.readDouble();
-    }
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Thu Oct 11 16:48:45 2012
@@ -62,15 +62,14 @@ public abstract class MutableVertex<I ex
    * @param vertexId Id of the new vertex.
    * @param vertexValue Value of the new vertex.
    * @param edges Map of edges to be added to this vertex.
-   * @param messages Messages to be added to the vertex (typically empty)
    * @return A new vertex for adding to the graph
    */
   public Vertex<I, V, E, M> instantiateVertex(
-      I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
+      I vertexId, V vertexValue, Map<I, E> edges) {
     MutableVertex<I, V, E, M> mutableVertex =
         (MutableVertex<I, V, E, M>) getConf().createVertex();
     mutableVertex.setGraphState(getGraphState());
-    mutableVertex.initialize(vertexId, vertexValue, edges, messages);
+    mutableVertex.initialize(vertexId, vertexValue, edges);
     return mutableVertex;
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Thu Oct 11 16:48:45 2012
@@ -28,10 +28,9 @@ import com.google.common.collect.Iterabl
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Mutable vertex with no edge values.
@@ -43,6 +42,18 @@ public abstract class SimpleMutableVerte
     V extends Writable, M extends Writable> extends MutableVertex<I, V,
     NullWritable, M> {
   /**
+   * Set the neighbors of this vertex.
+   *
+   * @param neighbors Set of destination vertex ids.
+   */
+  public abstract void setNeighbors(Set<I> neighbors);
+
+  @Override
+  public void setEdges(Map<I, NullWritable> edges) {
+    setNeighbors(edges.keySet());
+  }
+
+  /**
    * Get a read-only view of the neighbors of this
    * vertex, i.e. the target vertices of its out-edges.
    *
@@ -105,14 +116,7 @@ public abstract class SimpleMutableVerte
       edges.put(targetVertexId, NullWritable.get());
     }
 
-    int numMessages = in.readInt();
-    List<M> messages = new ArrayList<M>(numMessages);
-    for (int i = 0; i < numMessages; ++i) {
-      M message = (M) getConf().createMessageValue();
-      message.readFields(in);
-      messages.add(message);
-    }
-    initialize(vertexId, vertexValue, edges, messages);
+    initialize(vertexId, vertexValue, edges);
 
     boolean halt = in.readBoolean();
     if (halt) {
@@ -132,11 +136,6 @@ public abstract class SimpleMutableVerte
       neighbor.write(out);
     }
 
-    out.writeInt(getNumMessages());
-    for (M message : getMessages()) {
-      message.write(out);
-    }
-
     out.writeBoolean(isHalted());
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java Thu Oct 11 16:48:45 2012
@@ -28,10 +28,9 @@ import com.google.common.collect.Iterabl
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.annotation.Nullable;
 
@@ -45,6 +44,18 @@ public abstract class SimpleVertex<I ext
     V extends Writable, M extends Writable> extends Vertex<I, V,
     NullWritable, M> {
   /**
+   * Set the neighbors of this vertex.
+   *
+   * @param neighbors Set of destination vertex ids.
+   */
+  public abstract void setNeighbors(Set<I> neighbors);
+
+  @Override
+  public void setEdges(Map<I, NullWritable> edges) {
+    setNeighbors(edges.keySet());
+  }
+
+  /**
    * Get a read-only view of the neighbors of this
    * vertex, i.e. the target vertices of its out-edges.
    *
@@ -84,14 +95,7 @@ public abstract class SimpleVertex<I ext
       edges.put(targetVertexId, NullWritable.get());
     }
 
-    int numMessages = in.readInt();
-    List<M> messages = new ArrayList<M>(numMessages);
-    for (int i = 0; i < numMessages; ++i) {
-      M message = getConf().createMessageValue();
-      message.readFields(in);
-      messages.add(message);
-    }
-    initialize(vertexId, vertexValue, edges, messages);
+    initialize(vertexId, vertexValue, edges);
 
     boolean halt = in.readBoolean();
     if (halt) {
@@ -111,11 +115,6 @@ public abstract class SimpleVertex<I ext
       neighbor.write(out);
     }
 
-    out.writeInt(getNumMessages());
-    for (M message : getMessages()) {
-      message.write(out);
-    }
-
     out.writeBoolean(isHalted());
   }
 }

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Thu Oct 11 16:48:45 2012
@@ -25,13 +25,12 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.Mapper;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -58,7 +57,6 @@ public abstract class Vertex<I extends W
   /** Configuration */
   private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
 
-
   /**
    * This method must be called after instantiation of a vertex
    * with ImmutableClassesGiraphConfiguration
@@ -67,15 +65,17 @@ public abstract class Vertex<I extends W
    *
    * @param id Will be the vertex id
    * @param value Will be the vertex value
-   * @param edges A map of destination edge ids to edge values (can be null)
-   * @param messages Initial messages for this vertex (can be null)
+   * @param edges A map of destination vertex ids to edge values
    */
-  public abstract void initialize(
-      I id, V value, Map<I, E> edges, Iterable<M> messages);
+  public void initialize(I id, V value, Map<I, E> edges) {
+    this.id = id;
+    this.value = value;
+    setEdges(edges);
+  }
 
   /**
-   * This method must be called once by the subclass's initialize() or by
-   * readFields() in order to set id and value.
+   * This method only sets id and value. Can be used by Vertex
+   * implementations in readFields().
    *
    * @param id Vertex id
    * @param value Vertex value
@@ -83,9 +83,17 @@ public abstract class Vertex<I extends W
   public void initialize(I id, V value) {
     this.id = id;
     this.value = value;
+    setEdges(Maps.<I, E>newHashMapWithExpectedSize(0));
   }
 
   /**
+   * Set the outgoing edges for this vertex.
+   *
+   * @param edges Map of destination vertices to edge values
+   */
+  public abstract void setEdges(Map<I, E> edges);
+
+  /**
    * Must be defined by user to do computation on a single Vertex.
    *
    * @param messages Messages that were sent to this vertex in the previous
@@ -253,35 +261,6 @@ public abstract class Vertex<I extends W
   }
 
   /**
-   *  Get the list of incoming messages from the previous superstep.  Same as
-   *  the message iterator passed to compute().
-   *
-   *  @return Messages received.
-   */
-  public abstract Iterable<M> getMessages();
-
-  /**
-   * Get the number of messages from the previous superstep.
-   * @return Number of messages received.
-   */
-  public int getNumMessages() {
-    return Iterables.size(getMessages());
-  }
-
-  /**
-   * Copy the messages this vertex should process in the current superstep
-   *
-   * @param messages the messages sent to this vertex in the previous superstep
-   */
-  abstract void putMessages(Iterable<M> messages);
-
-  /**
-   * Release unnecessary resources (will be called after vertex returns from
-   * {@link #compute(Iterable)})
-   */
-  abstract void releaseResources();
-
-  /**
    * Get the graph state for all workers.
    *
    * @return Graph state for all workers
@@ -295,7 +274,7 @@ public abstract class Vertex<I extends W
    *
    * @param graphState Graph state for all workers
    */
-  public void setGraphState(GraphState<I, V, E, M> graphState) {
+  void setGraphState(GraphState<I, V, E, M> graphState) {
     this.graphState = graphState;
   }
 
@@ -346,14 +325,7 @@ public abstract class Vertex<I extends W
       edges.put(targetVertexId, edgeValue);
     }
 
-    int numMessages = in.readInt();
-    List<M> messages = new ArrayList<M>(numMessages);
-    for (int i = 0; i < numMessages; ++i) {
-      M message = (M) getConf().createMessageValue();
-      message.readFields(in);
-      messages.add(message);
-    }
-    initialize(vertexId, vertexValue, edges, messages);
+    initialize(vertexId, vertexValue, edges);
 
     halt = in.readBoolean();
   }
@@ -369,11 +341,6 @@ public abstract class Vertex<I extends W
       edge.getValue().write(out);
     }
 
-    out.writeInt(getNumMessages());
-    for (M message : getMessages()) {
-      message.write(out);
-    }
-
     out.writeBoolean(halt);
   }
 

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Thu Oct 11 16:48:45 2012
@@ -88,7 +88,6 @@ public class VertexResolver<I extends Wr
         vertex = instantiateVertex();
         vertex.initialize(vertexId,
             getConf().createVertexValue(),
-            null,
             null);
       }
     } else {

Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Thu Oct 11 16:48:45 2012
@@ -33,7 +33,7 @@ public class WorkerInfo implements Writa
   private String hostname;
   /** Task Partition (Worker) ID of this worker */
   private int taskId = -1;
-  /** Port that the RPC server is using */
+  /** Port that the IPC server is using */
   private int port = -1;
   /** Hostname + "_" + id for easier debugging */
   private String hostnameId;

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java Thu Oct 11 16:48:45 2012
@@ -170,7 +170,7 @@ public class PseudoRandomVertexInputForm
         } while (edges.containsKey(destVertexId));
         edges.put(destVertexId, new DoubleWritable(rand.nextDouble()));
       }
-      vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null);
+      vertex.initialize(new LongWritable(vertexId), vertexValue, edges);
       ++verticesRead;
       if (LOG.isTraceEnabled()) {
         LOG.trace("next: Return vertexId=" +

Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Thu Oct 11 16:48:45 2012
@@ -178,7 +178,7 @@ public abstract class TextVertexInputFor
     InterruptedException {
       Text line = getRecordReader().getCurrentValue();
       Vertex<I, V, E, M> vertex = getConf().createVertex();
-      vertex.initialize(getId(line), getValue(line), getEdges(line), null);
+      vertex.initialize(getId(line), getValue(line), getEdges(line));
       return vertex;
     }
 
@@ -248,7 +248,7 @@ public abstract class TextVertexInputFor
       T processed = preprocessLine(line);
       vertex = getConf().createVertex();
       vertex.initialize(getId(processed), getValue(processed),
-          getEdges(processed), null);
+          getEdges(processed));
       return vertex;
     }
 
@@ -335,7 +335,7 @@ public abstract class TextVertexInputFor
         Configuration conf = getContext().getConfiguration();
         vertex = getConf().createVertex();
         vertex.initialize(getId(processed), getValue(processed),
-            getEdges(processed), null);
+            getEdges(processed));
       } catch (IOException e) {
         throw e;
       // CHECKSTYLE: stop IllegalCatch

Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Thu Oct 11 16:48:45 2012
@@ -117,7 +117,7 @@ public class RequestTest {
         IntWritable, IntWritable>>();
     for (int i = 0; i < 10; ++i) {
       TestVertex vertex = new TestVertex();
-      vertex.initialize(new IntWritable(i), new IntWritable(i), null, null);
+      vertex.initialize(new IntWritable(i), new IntWritable(i));
       vertices.add(vertex);
     }
 
@@ -210,7 +210,7 @@ public class RequestTest {
       IntWritable, IntWritable>();
       for (int j = 0; j < 3; ++j) {
         TestVertex vertex = new TestVertex();
-        vertex.initialize(new IntWritable(i), new IntWritable(j), null, null);
+        vertex.initialize(new IntWritable(i), new IntWritable(j));
         mutations.addVertex(vertex);
       }
       for (int j = 0; j < 2; ++j) {

Modified: giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java Thu Oct 11 16:48:45 2012
@@ -52,7 +52,7 @@ public class SimpleShortestPathsVertexTe
   public void testOnShorterPathFound() throws Exception {
 
     SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
-    vertex.initialize(null, null, null, null);
+    vertex.initialize(null, null);
     vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
     vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
 
@@ -83,7 +83,7 @@ public class SimpleShortestPathsVertexTe
   public void testOnNoShorterPathFound() throws Exception {
 
     SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
-    vertex.initialize(null, null, null, null);
+    vertex.initialize(null, null);
     vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
     vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
 

Modified: giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java Thu Oct 11 16:48:45 2012
@@ -46,7 +46,7 @@ public class SimpleTriangleClosingVertex
       new SimpleTriangleClosingVertex();
     SimpleTriangleClosingVertex.IntArrayListWritable alw =
       new SimpleTriangleClosingVertex.IntArrayListWritable();
-    vertex.initialize(null, null, null, null);
+    vertex.initialize(null, null);
     vertex.addEdge(new IntWritable(5), NullWritable.get());
     vertex.addEdge(new IntWritable(7), NullWritable.get());
 
@@ -72,7 +72,7 @@ public class SimpleTriangleClosingVertex
     // messages properly to verify the algorithm
     SimpleTriangleClosingVertex vertex =
       new SimpleTriangleClosingVertex();
-    vertex.initialize(null, null, null, null);
+    vertex.initialize(null, null);
     MockUtils.MockedEnvironment<IntWritable,
       SimpleTriangleClosingVertex.IntArrayListWritable,
       NullWritable, IntWritable>