You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2012/10/11 18:48:47 UTC
svn commit: r1397159 [1/2] - in /giraph/trunk: ./
giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/
giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/
giraph-formats-contrib/src/test/java/org/apache/giraph/...
Author: apresta
Date: Thu Oct 11 16:48:45 2012
New Revision: 1397159
URL: http://svn.apache.org/viewvc?rev=1397159&view=rev
Log:
GIRAPH-200: remove hadoop RPC and keep just netty (apresta)
Added:
giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
Removed:
giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/CommunicationsInterface.java
giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/SecureRPCCommunications.java
giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessages.java
giraph/trunk/src/main/java/org/apache/giraph/comm/VertexIdMessagesList.java
giraph/trunk/src/main/java/org/apache/giraph/comm/VertexList.java
giraph/trunk/src/main/java/org/apache/giraph/hadoop/
giraph/trunk/src/test/java/org/apache/giraph/comm/RPCCommunicationsTest.java
giraph/trunk/src/test/java/org/apache/giraph/examples/TryMultiRpcBindingPortsTest.java
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
giraph/trunk/pom.xml
giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java
giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
giraph/trunk/src/test/java/org/apache/giraph/graph/TestEdgeListVertex.java
giraph/trunk/src/test/java/org/apache/giraph/graph/TestIntIntNullIntVertex.java
giraph/trunk/src/test/java/org/apache/giraph/graph/partition/TestPartitionStores.java
giraph/trunk/src/test/java/org/apache/giraph/io/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Thu Oct 11 16:48:45 2012
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-200: remove hadoop RPC and keep just netty (apresta)
+
GIRAPH-363: Fix hadoop_0.23 profile broken by GIRAPH-211 (ekoontz)
GIRAPH-211: Add secure authentication to Netty IPC (ekoontz)
Modified: giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/main/java/org/apache/giraph/io/hcatalog/HCatalogVertexInputFormat.java Thu Oct 11 16:48:45 2012
@@ -237,7 +237,7 @@ public abstract class HCatalogVertexInpu
BspUtils.createVertex(getContext()
.getConfiguration());
vertex.initialize(getVertexId(record), getVertexValue(record),
- getEdges(record), null);
+ getEdges(record));
++recordCount;
if ((recordCount % recordModLimit) == 0) {
log.info("read " + recordCount + " records");
@@ -373,9 +373,8 @@ public abstract class HCatalogVertexInpu
private void createCurrentVertex() {
vertex = BspUtils.
createVertex(getContext().getConfiguration());
- vertex.initialize(currentVertexId,
- getVertexValue(
- recordsForVertex), destEdgeMap, null);
+ vertex.initialize(currentVertexId, getVertexValue(recordsForVertex),
+ destEdgeMap);
destEdgeMap.clear();
recordsForVertex.clear();
++recordCount;
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java Thu Oct 11 16:48:45 2012
@@ -87,7 +87,7 @@ public class AccumuloEdgeInputFormat
String edge = new String(value.get());
Text edgeId = new Text(edge);
edges.put(edgeId, uselessEdgeValue);
- vertex.initialize(vertexId, new Text(), edges, null);
+ vertex.initialize(vertexId, new Text(), edges);
return vertex;
}
Modified: giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java (original)
+++ giraph/trunk/giraph-formats-contrib/src/test/java/org/apache/giraph/io/hbase/edgemarker/TableEdgeInputFormat.java Thu Oct 11 16:48:45 2012
@@ -93,7 +93,7 @@ public class TableEdgeInputFormat extend
Text vertexValue = new Text();
Text edgeId = new Text(edge);
edges.put(edgeId, uselessEdgeValue);
- vertex.initialize(vertexId, vertexValue, edges, null);
+ vertex.initialize(vertexId, vertexValue, edges);
return vertex;
}
Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Thu Oct 11 16:48:45 2012
@@ -558,7 +558,7 @@ under the License.
</executions>
<!-- profile: hadoop_0.20.203 -->
<configuration>
- <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
+ <symbols>HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
</configuration>
</plugin>
</plugins>
@@ -608,7 +608,7 @@ under the License.
</executions>
<!-- profile: hadoop_1.0 -->
<configuration>
- <symbols>HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
+ <symbols>HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_1_SECURITY,HADOOP_1_SECRET_MANAGER</symbols>
</configuration>
</plugin>
</plugins>
@@ -657,17 +657,17 @@ under the License.
</executions>
<configuration>
<!-- profile: hadoop_non_secure -->
- <symbols>HADOOP_NON_SECURE,HADOOP_NON_INTERVERSIONED_RPC,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
- <excludes>**/comm/SecureRPCCommunications.java,
- **/comm/netty/SaslNettyClient.java,
- **/comm/netty/SaslNettyServer.java,
- **/comm/netty/handler/AuthorizeServerHandler.java,
- **/comm/netty/handler/SaslClientHandler.java,
- **/comm/netty/handler/SaslServerHandler.java,
- **/comm/requests/SaslCompleteRequest.java,
- **/comm/requests/SaslTokenMessageRequest.java,
- **/comm/SaslConnectionTest.java,
- **/hadoop/BspTokenSelector.java</excludes>
+ <symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
+ <excludes>
+ **/comm/netty/SaslNettyClient.java,
+ **/comm/netty/SaslNettyServer.java,
+ **/comm/netty/handler/AuthorizeServerHandler.java,
+ **/comm/netty/handler/SaslClientHandler.java,
+ **/comm/netty/handler/SaslServerHandler.java,
+ **/comm/requests/SaslCompleteRequest.java,
+ **/comm/requests/SaslTokenMessageRequest.java,
+ **/comm/SaslConnectionTest.java
+ </excludes>
</configuration>
</plugin>
<plugin>
@@ -720,10 +720,6 @@ under the License.
<resources>
<resource>
<directory>${basedir}/src/main/java/org/apache/giraph/hadoop</directory>
- <excludes>
- <exclude>BspTokenSelector.java</exclude>
- <exclude>SecureRPCCommunications.java</exclude>
- </excludes>
</resource>
<resource>
<directory>${basedir}/src/main/java/org/apache/giraph/comm/netty</directory>
@@ -735,10 +731,6 @@ under the License.
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
- <excludes>
- <exclude>**/BspTokenSelector.java</exclude>
- <exclude>**/SecureRPCCommunications.java</exclude>
- </excludes>
<source>${compileSource}</source>
<target>${compileSource}</target>
</configuration>
@@ -759,16 +751,16 @@ under the License.
<!-- profile: hadoop_facebook -->
<configuration>
<symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE</symbols>
- <excludes>**/comm/SecureRPCCommunications.java,
- **/comm/netty/SaslNettyClient.java,
- **/comm/netty/SaslNettyServer.java,
- **/comm/netty/handler/AuthorizeServerHandler.java,
- **/comm/netty/handler/SaslClientHandler.java,
- **/comm/netty/handler/SaslServerHandler.java,
- **/comm/requests/SaslCompleteRequest.java,
- **/comm/requests/SaslTokenMessageRequest.java,
- **/comm/SaslConnectionTest.java,
- **/hadoop/BspTokenSelector.java</excludes>
+ <excludes>
+ **/comm/netty/SaslNettyClient.java,
+ **/comm/netty/SaslNettyServer.java,
+ **/comm/netty/handler/AuthorizeServerHandler.java,
+ **/comm/netty/handler/SaslClientHandler.java,
+ **/comm/netty/handler/SaslServerHandler.java,
+ **/comm/requests/SaslCompleteRequest.java,
+ **/comm/requests/SaslTokenMessageRequest.java,
+ **/comm/SaslConnectionTest.java
+ </excludes>
</configuration>
</plugin>
<plugin>
Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Thu Oct 11 16:48:45 2012
@@ -156,11 +156,6 @@ public class GiraphConfiguration extends
/** Local ZooKeeper directory to use */
public static final String ZOOKEEPER_DIR = "giraph.zkDir";
- /** Use the RPC communication or netty communication */
- public static final String USE_NETTY = "giraph.useNetty";
- /** Default is to use RPC, not netty */
- public static final boolean USE_NETTY_DEFAULT = false;
-
/** TCP backlog (defaults to number of workers) */
public static final String TCP_BACKLOG = "giraph.tcpBacklog";
/**
@@ -278,29 +273,24 @@ public class GiraphConfiguration extends
/** Default Netty max connection failures */
public static final int NETTY_MAX_CONNECTION_FAILURES_DEFAULT = 1000;
- /** Initial port to start using for the RPC communication */
- public static final String RPC_INITIAL_PORT = "giraph.rpcInitialPort";
- /** Default port to start using for the RPC communication */
- public static final int RPC_INITIAL_PORT_DEFAULT = 30000;
-
- /** Maximum bind attempts for different RPC ports */
- public static final String MAX_RPC_PORT_BIND_ATTEMPTS =
- "giraph.maxRpcPortBindAttempts";
- /** Default maximum bind attempts for different RPC ports */
- public static final int MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
+ /** Initial port to start using for the IPC communication */
+ public static final String IPC_INITIAL_PORT = "giraph.ipcInitialPort";
+ /** Default port to start using for the IPC communication */
+ public static final int IPC_INITIAL_PORT_DEFAULT = 30000;
+
+ /** Maximum bind attempts for different IPC ports */
+ public static final String MAX_IPC_PORT_BIND_ATTEMPTS =
+ "giraph.maxIpcPortBindAttempts";
+ /** Default maximum bind attempts for different IPC ports */
+ public static final int MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT = 20;
/**
- * Fail first RPC port binding attempt, simulate binding failure
+ * Fail first IPC port binding attempt, simulate binding failure
* on real grid testing
*/
- public static final String FAIL_FIRST_RPC_PORT_BIND_ATTEMPT =
- "giraph.failFirstRpcPortBindAttempt";
- /** Default fail first RPC port binding attempt flag */
- public static final boolean FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT = false;
-
- /** Maximum number of RPC handlers */
- public static final String RPC_NUM_HANDLERS = "giraph.rpcNumHandlers";
- /** Default maximum number of RPC handlers */
- public static final int RPC_NUM_HANDLERS_DEFAULT = 100;
+ public static final String FAIL_FIRST_IPC_PORT_BIND_ATTEMPT =
+ "giraph.failFirstIpcPortBindAttempt";
+ /** Default fail first IPC port binding attempt flag */
+ public static final boolean FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT = false;
/** Client send buffer size */
public static final String CLIENT_SEND_BUFFER_SIZE =
@@ -762,10 +752,6 @@ public class GiraphConfiguration extends
return getBoolean(LOCAL_TEST_MODE, LOCAL_TEST_MODE_DEFAULT);
}
- public boolean getUseNetty() {
- return getBoolean(USE_NETTY, USE_NETTY_DEFAULT);
- }
-
public int getZooKeeperServerCount() {
return getInt(ZOOKEEPER_SERVER_COUNT,
ZOOKEEPER_SERVER_COUNT_DEFAULT);
Modified: giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java Thu Oct 11 16:48:45 2012
@@ -167,15 +167,6 @@ public interface CentralizedServiceWorke
Collection<? extends PartitionOwner> masterSetPartitionOwners);
/**
- * Assign messages to a vertex (bypasses package-private access to
- * setMessages() for internal classes).
- *
- * @param vertex Vertex (owned by worker)
- * @param messages Messages to assign to the vertex
- */
- void assignMessagesToVertex(Vertex<I, V, E, M> vertex, Iterable<M> messages);
-
- /**
* Get master info
*
* @return Master info
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java Thu Oct 11 16:48:45 2012
@@ -44,7 +44,7 @@ public interface WorkerServer<I extends
/**
* Move the in transition messages to the in messages for every vertex and
- * add new connections to any newly appearing RPC proxies.
+ * add new connections to any newly appearing IPC proxies.
*/
void prepareSuperstep();
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java Thu Oct 11 16:48:45 2012
@@ -148,8 +148,11 @@ public class SimpleMessageStore<I extend
public Collection<M> getVertexMessages(I vertexId) throws IOException {
ConcurrentMap<I, Collection<M>> partitionMap =
map.get(getPartitionId(vertexId));
- return (partitionMap == null) ? Collections.<M>emptyList() :
- partitionMap.get(vertexId);
+ if (partitionMap == null) {
+ return Collections.<M>emptyList();
+ }
+ Collection<M> messages = partitionMap.get(vertexId);
+ return (messages == null) ? Collections.<M>emptyList() : messages;
}
@Override
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java Thu Oct 11 16:48:45 2012
@@ -16,6 +16,6 @@
* limitations under the License.
*/
/**
- * Package of communication related objects, RPC service.
+ * Package of communication related objects, IPC service.
*/
package org.apache.giraph.comm.messages;
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyServer.java Thu Oct 11 16:48:45 2012
@@ -279,27 +279,27 @@ else[HADOOP_NON_SECURE]*/
int numServers = conf.getInt(GiraphConfiguration.MAX_WORKERS, numTasks) + 1;
int portIncrementConstant =
(int) Math.pow(10, Math.ceil(Math.log10(numServers)));
- int bindPort = conf.getInt(GiraphConfiguration.RPC_INITIAL_PORT,
- GiraphConfiguration.RPC_INITIAL_PORT_DEFAULT) +
+ int bindPort = conf.getInt(GiraphConfiguration.IPC_INITIAL_PORT,
+ GiraphConfiguration.IPC_INITIAL_PORT_DEFAULT) +
taskId;
int bindAttempts = 0;
- final int maxRpcPortBindAttempts =
- conf.getInt(GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS,
- GiraphConfiguration.MAX_RPC_PORT_BIND_ATTEMPTS_DEFAULT);
+ final int maxIpcPortBindAttempts =
+ conf.getInt(GiraphConfiguration.MAX_IPC_PORT_BIND_ATTEMPTS,
+ GiraphConfiguration.MAX_IPC_PORT_BIND_ATTEMPTS_DEFAULT);
final boolean failFirstPortBindingAttempt =
- conf.getBoolean(GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT,
- GiraphConfiguration.FAIL_FIRST_RPC_PORT_BIND_ATTEMPT_DEFAULT);
+ conf.getBoolean(GiraphConfiguration.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT,
+ GiraphConfiguration.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT_DEFAULT);
// Simple handling of port collisions on the same machine while
// preserving debugability from the port number alone.
// Round up the max number of workers to the next power of 10 and use
// it as a constant to increase the port number with.
- while (bindAttempts < maxRpcPortBindAttempts) {
+ while (bindAttempts < maxIpcPortBindAttempts) {
this.myAddress = new InetSocketAddress(localHostname, bindPort);
if (failFirstPortBindingAttempt && bindAttempts == 0) {
if (LOG.isInfoEnabled()) {
LOG.info("start: Intentionally fail first " +
- "binding attempt as giraph.failFirstRpcPortBindAttempt " +
+ "binding attempt as giraph.failFirstIpcPortBindAttempt " +
"is true, port " + bindPort);
}
++bindAttempts;
@@ -319,7 +319,7 @@ else[HADOOP_NON_SECURE]*/
bindPort += portIncrementConstant;
}
}
- if (bindAttempts == maxRpcPortBindAttempts || myAddress == null) {
+ if (bindAttempts == maxIpcPortBindAttempts || myAddress == null) {
throw new IllegalStateException(
"start: Failed to start NettyServer with " +
bindAttempts + " attempts");
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java Thu Oct 11 16:48:45 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.comm.netty;
-import java.util.Iterator;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.bsp.CentralizedServiceWorker;
@@ -49,6 +48,7 @@ import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -134,7 +134,7 @@ public class NettyWorkerClient<I extends
@Override
public void fixPartitionIdToSocketAddrMap() {
// 1. Fix all the cached inet addresses (remove all changed entries)
- // 2. Connect to any new RPC servers
+ // 2. Connect to any new IPC servers
Map<InetSocketAddress, Integer> addressTaskIdMap =
Maps.newHashMapWithExpectedSize(service.getPartitionOwners().size());
for (PartitionOwner partitionOwner : service.getPartitionOwners()) {
@@ -165,12 +165,8 @@ public class NettyWorkerClient<I extends
partitionOwner.getWorkerInfo().getTaskId());
}
}
- boolean useNetty = conf.getBoolean(GiraphConfiguration.USE_NETTY,
- GiraphConfiguration.USE_NETTY_DEFAULT);
- if (useNetty) {
- addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
- null);
- }
+ addressTaskIdMap.put(service.getMasterInfo().getInetSocketAddress(),
+ null);
nettyClient.connectAllAddresses(addressTaskIdMap);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/package-info.java Thu Oct 11 16:48:45 2012
@@ -16,6 +16,6 @@
* limitations under the License.
*/
/**
- * Package of communication related objects, RPC service.
+ * Package of communication related objects, IPC service.
*/
package org.apache.giraph.comm;
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/LongDoubleFloatDoubleTextInputFormat.java Thu Oct 11 16:48:45 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -78,8 +77,7 @@ public class LongDoubleFloatDoubleTextIn
}
LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
- vertex.initialize(vertexId, new DoubleWritable(), edges,
- Lists.<DoubleWritable>newArrayList());
+ vertex.initialize(vertexId, new DoubleWritable(), edges);
return vertex;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleFloatDoubleTextInputFormat.java Thu Oct 11 16:48:45 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -78,8 +77,7 @@ public class NormalizingLongDoubleFloatD
normalize(edges);
LongWritable vertexId = new LongWritable(Long.parseLong(tokens[0]));
- vertex.initialize(vertexId, new DoubleWritable(), edges,
- Lists.<DoubleWritable>newArrayList());
+ vertex.initialize(vertexId, new DoubleWritable(), edges);
return vertex;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleMutateGraphVertex.java Thu Oct 11 16:48:45 2012
@@ -87,7 +87,7 @@ public class SimpleMutateGraphVertex ext
new LongWritable(rangeVertexIdStart(3) + getId().get());
Vertex<LongWritable, DoubleWritable,
FloatWritable, DoubleWritable> vertex =
- instantiateVertex(vertexIndex, new DoubleWritable(0.0), null, null);
+ instantiateVertex(vertexIndex, new DoubleWritable(0.0), null);
addVertexRequest(vertex);
// Add edges to those remote vertices as well
addEdgeRequest(vertexIndex,
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimplePageRankVertex.java Thu Oct 11 16:48:45 2012
@@ -195,7 +195,7 @@ public class SimplePageRankVertex extend
float edgeValue = vertexId.get() * 100f;
Map<LongWritable, FloatWritable> edges = Maps.newHashMap();
edges.put(new LongWritable(targetVertexId), new FloatWritable(edgeValue));
- vertex.initialize(vertexId, vertexValue, edges, null);
+ vertex.initialize(vertexId, vertexValue, edges);
++recordsRead;
if (LOG.isInfoEnabled()) {
LOG.info("next: Return vertexId=" + vertex.getId().get() +
Modified: giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/examples/SimpleSuperstepVertex.java Thu Oct 11 16:48:45 2012
@@ -84,7 +84,7 @@ public class SimpleSuperstepVertex exten
float edgeValue = vertexId.get() * 100f;
edgeMap.put(new LongWritable(targetVertexId),
new FloatWritable(edgeValue));
- vertex.initialize(vertexId, vertexValue, edgeMap, null);
+ vertex.initialize(vertexId, vertexValue, edgeMap);
++recordsRead;
if (LOG.isInfoEnabled()) {
LOG.info("next: Return vertexId=" + vertex.getId().get() +
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Thu Oct 11 16:48:45 2012
@@ -18,7 +18,6 @@
package org.apache.giraph.graph;
-import com.google.common.collect.Sets;
import org.apache.giraph.GiraphConfiguration;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
@@ -56,6 +55,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
import org.json.JSONException;
import org.json.JSONObject;
+import com.google.common.collect.Sets;
import net.iharder.Base64;
import java.io.ByteArrayOutputStream;
@@ -762,15 +762,13 @@ public class BspServiceMaster<I extends
aggregatorHandler = new MasterAggregatorHandler(getConfiguration());
aggregatorHandler.initialize(this);
- if (getConfiguration().getUseNetty()) {
- commService = new NettyMasterClientServer(
- getContext(), getConfiguration());
- masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
- commService.getMyAddress().getPort());
- // write my address to znode so workers could read it
- WritableUtils.writeToZnode(getZkExt(), currentMasterPath, -1,
- masterInfo);
- }
+ commService = new NettyMasterClientServer(
+ getContext(), getConfiguration());
+ masterInfo = new WorkerInfo(getHostname(), getTaskPartition(),
+ commService.getMyAddress().getPort());
+ // write my address to znode so workers could read it
+ WritableUtils.writeToZnode(getZkExt(), currentMasterPath, -1,
+ masterInfo);
if (LOG.isInfoEnabled()) {
LOG.info("becomeMaster: I am now the master!");
@@ -1273,9 +1271,7 @@ public class BspServiceMaster<I extends
}
}
- if (getConfiguration().getUseNetty()) {
- commService.fixWorkerAddresses(chosenWorkerInfoList);
- }
+ commService.fixWorkerAddresses(chosenWorkerInfoList);
currentWorkersCounter.increment(chosenWorkerInfoList.size() -
currentWorkersCounter.getValue());
@@ -1549,10 +1545,8 @@ public class BspServiceMaster<I extends
}
aggregatorHandler.close();
- if (getConfiguration().getUseNetty()) {
- commService.closeConnections();
- commService.close();
- }
+ commService.closeConnections();
+ commService.close();
}
try {
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Oct 11 16:48:45 2012
@@ -24,17 +24,11 @@ import org.apache.giraph.bsp.Centralized
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClientServer;
import org.apache.giraph.comm.netty.NettyWorkerClientServer;
-/*if[HADOOP_NON_SECURE]
-import org.apache.giraph.comm.RPCCommunications;
-else[HADOOP_NON_SECURE]*/
-import org.apache.giraph.comm.SecureRPCCommunications;
-/*end[HADOOP_NON_SECURE]*/
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionExchange;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.PartitionStore;
-import org.apache.giraph.graph.partition.SimplePartitionStore;
import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.WritableUtils;
@@ -121,10 +115,6 @@ public class BspServiceWorker<I extends
/** Input split max vertices (-1 denotes all) */
private final long inputSplitMaxVertices;
/**
- * Partition store for worker (only used by the Hadoop RPC implementation).
- */
- private final PartitionStore<I, V, E, M> workerPartitionStore;
- /**
* Stores and processes the list of InputSplits advertised
* in a tree of child znodes by the master.
*/
@@ -162,27 +152,14 @@ public class BspServiceWorker<I extends
GiraphConfiguration.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
workerGraphPartitioner =
getGraphPartitionerFactory().createWorkerGraphPartitioner();
- boolean useNetty = getConfiguration().getUseNetty();
- if (useNetty) {
- commService = new NettyWorkerClientServer<I, V, E, M>(
- context, getConfiguration(), this);
- } else {
-/*if[HADOOP_NON_SECURE]
- commService =
- new RPCCommunications<I, V, E, M>(context, this, getConfiguration(),
- graphState);
-else[HADOOP_NON_SECURE]*/
- commService =
- new SecureRPCCommunications<I, V, E, M>(context, this,
- getConfiguration(), graphState);
-/*end[HADOOP_NON_SECURE]*/
- }
+ commService = new NettyWorkerClientServer<I, V, E, M>(
+ context, getConfiguration(), this);
+
if (LOG.isInfoEnabled()) {
LOG.info("BspServiceWorker: maxVerticesPerTransfer = " +
transferRegulator.getMaxVerticesPerTransfer());
LOG.info("BspServiceWorker: maxEdgesPerTransfer = " +
- transferRegulator.getMaxEdgesPerTransfer() +
- " useNetty = " + useNetty);
+ transferRegulator.getMaxEdgesPerTransfer());
}
workerInfo = new WorkerInfo(
@@ -192,14 +169,6 @@ else[HADOOP_NON_SECURE]*/
this.workerContext =
getConfiguration().createWorkerContext(graphMapper.getGraphState());
- if (useNetty) {
- workerPartitionStore = null;
- } else {
- workerPartitionStore =
- new SimplePartitionStore<I, V, E, M>(getConfiguration(),
- getContext());
- }
-
aggregatorHandler = new WorkerAggregatorHandler();
}
@@ -542,12 +511,6 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public void assignMessagesToVertex(Vertex<I, V, E, M> vertex,
- Iterable<M> messages) {
- vertex.putMessages(messages);
- }
-
- @Override
public WorkerInfo getMasterInfo() {
return masterInfo;
}
@@ -840,12 +803,9 @@ else[HADOOP_NON_SECURE]*/
"startSuperstep: InterruptedException getting assignments", e);
}
-
- if (getConfiguration().getUseNetty()) {
- // get address of master
- WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
- null, masterInfo);
- }
+ // get address of master
+ WritableUtils.readFieldsFromZnode(getZkExt(), currentMasterPath, false,
+ null, masterInfo);
if (LOG.isInfoEnabled()) {
LOG.info("startSuperstep: Ready for computation on superstep " +
@@ -1097,7 +1057,6 @@ else[HADOOP_NON_SECURE]*/
LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
}
- boolean useNetty = getConfiguration().getUseNetty();
FSDataOutputStream verticesOutputStream =
getFs().create(verticesFilePath);
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
@@ -1107,11 +1066,8 @@ else[HADOOP_NON_SECURE]*/
long startPos = verticesOutputStream.getPos();
partition.write(verticesOutputStream);
// write messages
- verticesOutputStream.writeBoolean(useNetty);
- if (useNetty) {
- getServerData().getCurrentMessageStore().writePartition(
- verticesOutputStream, partition.getId());
- }
+ getServerData().getCurrentMessageStore().writePartition(
+ verticesOutputStream, partition.getId());
// Write the metadata for this partition
// Format:
// <index count>
@@ -1166,16 +1122,13 @@ else[HADOOP_NON_SECURE]*/
@Override
public void loadCheckpoint(long superstep) {
- if (getConfiguration().getBoolean(GiraphConfiguration.USE_NETTY,
- GiraphConfiguration.USE_NETTY_DEFAULT)) {
- try {
- // clear old message stores
- getServerData().getIncomingMessageStore().clearAll();
- getServerData().getCurrentMessageStore().clearAll();
- } catch (IOException e) {
- throw new RuntimeException(
- "loadCheckpoint: Failed to clear message stores ", e);
- }
+ try {
+ // clear old message stores
+ getServerData().getIncomingMessageStore().clearAll();
+ getServerData().getCurrentMessageStore().clearAll();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "loadCheckpoint: Failed to clear message stores ", e);
}
// Algorithm:
@@ -1462,11 +1415,7 @@ else[HADOOP_NON_SECURE]*/
@Override
public PartitionStore<I, V, E, M> getPartitionStore() {
- if (workerPartitionStore != null) {
- return workerPartitionStore;
- } else {
- return getServerData().getPartitionStore();
- }
+ return getServerData().getPartitionStore();
}
@Override
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/EdgeListVertex.java Thu Oct 11 16:48:45 2012
@@ -22,7 +22,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.DataInput;
@@ -50,20 +49,14 @@ public abstract class EdgeListVertex<I e
private static final Logger LOG = Logger.getLogger(EdgeListVertex.class);
/** List of edges */
private List<Edge<I, E>> edgeList = Lists.newArrayList();
- /** List of incoming messages from the previous superstep */
- private List<M> messageList = Lists.newArrayList();
@Override
- public void initialize(I id, V value, Map<I, E> edges, Iterable<M> messages) {
- super.initialize(id, value);
+ public void setEdges(Map<I, E> edges) {
if (edges != null) {
for (Map.Entry<I, E> edge : edges.entrySet()) {
edgeList.add(new Edge<I, E>(edge.getKey(), edge.getValue()));
}
}
- if (messages != null) {
- Iterables.<M>addAll(messageList, messages);
- }
}
@Override
@@ -104,28 +97,12 @@ public abstract class EdgeListVertex<I e
}
@Override
- void putMessages(Iterable<M> messages) {
- messageList.clear();
- Iterables.addAll(messageList, messages);
- }
-
- @Override
- public Iterable<M> getMessages() {
- return Iterables.unmodifiableIterable(messageList);
- }
-
- @Override
- public int getNumMessages() {
- return messageList.size();
- }
-
- @Override
public final void readFields(DataInput in) throws IOException {
I vertexId = getConf().createVertexId();
vertexId.readFields(in);
V vertexValue = getConf().createVertexValue();
vertexValue.readFields(in);
- super.initialize(vertexId, vertexValue);
+ initialize(vertexId, vertexValue);
int numEdges = in.readInt();
edgeList = Lists.newArrayListWithCapacity(numEdges);
@@ -137,14 +114,6 @@ public abstract class EdgeListVertex<I e
edgeList.add(new Edge<I, E>(targetVertexId, edgeValue));
}
- int numMessages = in.readInt();
- messageList = Lists.newArrayListWithCapacity(numMessages);
- for (int i = 0; i < numMessages; ++i) {
- M message = getConf().createMessageValue();
- message.readFields(in);
- messageList.add(message);
- }
-
boolean halt = in.readBoolean();
if (halt) {
voteToHalt();
@@ -164,18 +133,8 @@ public abstract class EdgeListVertex<I e
edge.getValue().write(out);
}
- out.writeInt(messageList.size());
- for (M message : messageList) {
- message.write(out);
- }
-
out.writeBoolean(isHalted());
}
- @Override
- void releaseResources() {
- // Hint to GC to free the messages
- messageList.clear();
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Thu Oct 11 16:48:45 2012
@@ -39,8 +39,6 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import com.google.common.collect.Iterables;
-
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URL;
@@ -471,11 +469,8 @@ public class GraphMapper<I extends Writa
serviceWorker.getWorkerContext().preSuperstep();
context.progress();
- boolean useNetty = conf.getUseNetty();
- MessageStoreByPartition<I, M> messageStore = null;
- if (useNetty) {
- messageStore = serviceWorker.getServerData().getCurrentMessageStore();
- }
+ MessageStoreByPartition<I, M> messageStore =
+ serviceWorker.getServerData().getCurrentMessageStore();
partitionStatsList.clear();
TimedLogger partitionLogger = new TimedLogger(15000, LOG);
@@ -489,27 +484,16 @@ public class GraphMapper<I extends Writa
// graphState before computing
vertex.setGraphState(graphState);
- Collection<M> messages = null;
- if (useNetty) {
- messages = messageStore.getVertexMessages(vertex.getId());
- messageStore.clearVertexMessages(vertex.getId());
- }
+ Collection<M> messages =
+ messageStore.getVertexMessages(vertex.getId());
+ messageStore.clearVertexMessages(vertex.getId());
- boolean hasMessages = (messages != null && !messages.isEmpty()) ||
- !Iterables.isEmpty(vertex.getMessages());
- if (vertex.isHalted() && hasMessages) {
+ if (vertex.isHalted() && !messages.isEmpty()) {
vertex.wakeUp();
}
if (!vertex.isHalted()) {
- Iterable<M> vertexMsgIt;
- if (messages == null) {
- vertexMsgIt = vertex.getMessages();
- } else {
- vertexMsgIt = messages;
- }
context.progress();
- vertex.compute(vertexMsgIt);
- vertex.releaseResources();
+ vertex.compute(messages);
}
if (vertex.isHalted()) {
partitionStats.incrFinishedVertexCount();
@@ -518,9 +502,7 @@ public class GraphMapper<I extends Writa
partitionStats.addEdgeCount(vertex.getNumEdges());
}
- if (useNetty) {
- messageStore.clearPartition(partition.getId());
- }
+ messageStore.clearPartition(partition.getId());
partitionStatsList.add(partitionStats);
++completedPartitions;
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/HashMapVertex.java Thu Oct 11 16:48:45 2012
@@ -24,14 +24,12 @@ import org.apache.log4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
@@ -56,17 +54,10 @@ public abstract class HashMapVertex<I ex
private static final Logger LOG = Logger.getLogger(HashMapVertex.class);
/** Map of target vertices and their edge values */
protected Map<I, E> edgeMap = new HashMap<I, E>();
- /** List of incoming messages from the previous superstep */
- private List<M> messageList = Lists.newArrayList();
@Override
- public void initialize(
- I id, V value, Map<I, E> edges, Iterable<M> messages) {
- super.initialize(id, value);
+ public void setEdges(Map<I, E> edges) {
edgeMap.putAll(edges);
- if (messages != null) {
- Iterables.<M>addAll(messageList, messages);
- }
}
@Override
@@ -129,28 +120,12 @@ public abstract class HashMapVertex<I ex
}
@Override
- void putMessages(Iterable<M> messages) {
- messageList.clear();
- Iterables.addAll(messageList, messages);
- }
-
- @Override
- public Iterable<M> getMessages() {
- return Iterables.unmodifiableIterable(messageList);
- }
-
- @Override
- public int getNumMessages() {
- return messageList.size();
- }
-
- @Override
public final void readFields(DataInput in) throws IOException {
I vertexId = getConf().createVertexId();
vertexId.readFields(in);
V vertexValue = getConf().createVertexValue();
vertexValue.readFields(in);
- super.initialize(vertexId, vertexValue);
+ initialize(vertexId, vertexValue);
int numEdges = in.readInt();
edgeMap = Maps.newHashMapWithExpectedSize(numEdges);
@@ -162,14 +137,6 @@ public abstract class HashMapVertex<I ex
edgeMap.put(targetVertexId, edgeValue);
}
- int numMessages = in.readInt();
- messageList = Lists.newArrayListWithCapacity(numMessages);
- for (int i = 0; i < numMessages; ++i) {
- M message = getConf().createMessageValue();
- message.readFields(in);
- messageList.add(message);
- }
-
boolean halt = in.readBoolean();
if (halt) {
voteToHalt();
@@ -189,18 +156,8 @@ public abstract class HashMapVertex<I ex
edge.getValue().write(out);
}
- out.writeInt(messageList.size());
- for (M message : messageList) {
- message.write(out);
- }
-
out.writeBoolean(isHalted());
}
- @Override
- void releaseResources() {
- // Hint to GC to free the messages
- messageList.clear();
- }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/IntIntNullIntVertex.java Thu Oct 11 16:48:45 2012
@@ -20,15 +20,12 @@ package org.apache.giraph.graph;
import org.apache.giraph.utils.UnmodifiableIntArrayIterator;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import com.google.common.collect.Iterables;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;
-import java.util.Map;
+import java.util.Set;
/**
* Simple implementation of {@link Vertex} using an int as id, value and
@@ -39,26 +36,14 @@ public abstract class IntIntNullIntVerte
SimpleVertex<IntWritable, IntWritable, IntWritable> {
/** Int array of neighbor vertex ids */
private int[] neighbors;
- /** Int array of messages */
- private int[] messages;
@Override
- public void initialize(IntWritable id, IntWritable value,
- Map<IntWritable, NullWritable> edges,
- Iterable<IntWritable> messages) {
- super.initialize(id, value);
- this.neighbors = new int[(edges != null) ? edges.size() : 0];
+ public void setNeighbors(Set<IntWritable> neighborSet) {
+ neighbors = new int[(neighborSet != null) ? neighborSet.size() : 0];
int n = 0;
- if (edges != null) {
- for (Map.Entry<IntWritable, NullWritable> edge : edges.entrySet()) {
- this.neighbors[n++] = edge.getKey().get();
- }
- }
- this.messages = new int[(messages != null) ? Iterables.size(messages) : 0];
- if (messages != null) {
- n = 0;
- for (IntWritable message : messages) {
- this.messages[n++] = message.get();
+ if (neighborSet != null) {
+ for (IntWritable neighbor : neighborSet) {
+ neighbors[n++] = neighbor.get();
}
}
}
@@ -89,35 +74,6 @@ public abstract class IntIntNullIntVerte
}
@Override
- public Iterable<IntWritable> getMessages() {
- return new Iterable<IntWritable>() {
- @Override
- public Iterator<IntWritable> iterator() {
- return new UnmodifiableIntArrayIterator(messages);
- }
- };
- }
-
- @Override
- public void putMessages(Iterable<IntWritable> newMessages) {
- messages = new int[Iterables.size(newMessages)];
- int n = 0;
- for (IntWritable message : newMessages) {
- messages[n++] = message.get();
- }
- }
-
- @Override
- public int getNumMessages() {
- return messages.length;
- }
-
- @Override
- void releaseResources() {
- messages = new int[0];
- }
-
- @Override
public void write(final DataOutput out) throws IOException {
out.writeInt(getId().get());
out.writeInt(getValue().get());
@@ -125,10 +81,6 @@ public abstract class IntIntNullIntVerte
for (int n = 0; n < neighbors.length; n++) {
out.writeInt(neighbors[n]);
}
- out.writeInt(messages.length);
- for (int n = 0; n < messages.length; n++) {
- out.writeInt(messages[n]);
- }
out.writeBoolean(isHalted());
}
@@ -136,17 +88,12 @@ public abstract class IntIntNullIntVerte
public void readFields(DataInput in) throws IOException {
int id = in.readInt();
int value = in.readInt();
- super.initialize(new IntWritable(id), new IntWritable(value));
+ initialize(new IntWritable(id), new IntWritable(value));
int numEdges = in.readInt();
neighbors = new int[numEdges];
for (int n = 0; n < numEdges; n++) {
neighbors[n] = in.readInt();
}
- int numMessages = in.readInt();
- messages = new int[numMessages];
- for (int n = 0; n < numMessages; n++) {
- messages[n] = in.readInt();
- }
boolean halt = in.readBoolean();
if (halt) {
voteToHalt();
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleEdgeListVertex.java Thu Oct 11 16:48:45 2012
@@ -21,24 +21,19 @@ package org.apache.giraph.graph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
import org.apache.giraph.utils.UnmodifiableLongFloatEdgeArrayIterable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
-import com.google.common.collect.Iterables;
-
/**
* Compact vertex representation with primitive arrays.
*/
public abstract class LongDoubleFloatDoubleEdgeListVertex extends
Vertex<LongWritable, DoubleWritable, FloatWritable, DoubleWritable> {
-
/** long represented vertex id */
private long id;
/** double represented vertex value */
@@ -47,13 +42,10 @@ public abstract class LongDoubleFloatDou
private long[] neighbors;
/** float array of edge weights */
private float[] edgeWeights;
- /** double array of messages */
- private double[] messages;
@Override
public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
- Map<LongWritable, FloatWritable> edges,
- Iterable<DoubleWritable> messages) {
+ Map<LongWritable, FloatWritable> edges) {
if (vertexId != null) {
id = vertexId.get();
}
@@ -70,12 +62,18 @@ public abstract class LongDoubleFloatDou
n++;
}
}
- this.messages =
- new double[(messages != null) ? Iterables.size(messages) : 0];
- if (messages != null) {
+ }
+
+ @Override
+ public void setEdges(Map<LongWritable, FloatWritable> edges) {
+ neighbors = new long[(edges != null) ? edges.size() : 0];
+ edgeWeights = new float[(edges != null) ? edges.size() : 0];
+ if (edges != null) {
int n = 0;
- for (DoubleWritable message : messages) {
- this.messages[n++] = message.get();
+ for (Entry<LongWritable, FloatWritable> neighbor : edges.entrySet()) {
+ neighbors[n] = neighbor.getKey().get();
+ edgeWeights[n] = neighbor.getValue().get();
+ n++;
}
}
}
@@ -135,30 +133,6 @@ public abstract class LongDoubleFloatDou
}
@Override
- public Iterable<DoubleWritable> getMessages() {
- return new Iterable<DoubleWritable>() {
- @Override
- public Iterator<DoubleWritable> iterator() {
- return new UnmodifiableDoubleArrayIterator(messages);
- }
- };
- }
-
- @Override
- public void putMessages(Iterable<DoubleWritable> newMessages) {
- messages = new double[Iterables.size(newMessages)];
- int n = 0;
- for (DoubleWritable message : newMessages) {
- messages[n++] = message.get();
- }
- }
-
- @Override
- void releaseResources() {
- messages = new double[0];
- }
-
- @Override
public void write(final DataOutput out) throws IOException {
out.writeLong(id);
out.writeDouble(value);
@@ -169,10 +143,6 @@ public abstract class LongDoubleFloatDou
for (int n = 0; n < edgeWeights.length; n++) {
out.writeFloat(edgeWeights[n]);
}
- out.writeInt(messages.length);
- for (int n = 0; n < messages.length; n++) {
- out.writeDouble(messages[n]);
- }
}
@Override
@@ -188,11 +158,6 @@ public abstract class LongDoubleFloatDou
for (int n = 0; n < numEdges; n++) {
edgeWeights[n] = in.readFloat();
}
- int numMessages = in.readInt();
- messages = new double[numMessages];
- for (int n = 0; n < numMessages; n++) {
- messages[n] = in.readDouble();
- }
}
@Override
@@ -204,11 +169,6 @@ public abstract class LongDoubleFloatDou
}
@Override
- public int getNumMessages() {
- return messages.length;
- }
-
- @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleFloatDoubleVertex.java Thu Oct 11 16:48:45 2012
@@ -21,7 +21,6 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
-import org.apache.mahout.math.function.DoubleProcedure;
import org.apache.mahout.math.function.LongFloatProcedure;
import org.apache.mahout.math.list.DoubleArrayList;
import org.apache.mahout.math.map.OpenLongFloatHashMap;
@@ -47,24 +46,13 @@ public abstract class LongDoubleFloatDou
/** Stores the edges */
private OpenLongFloatHashMap edgeMap =
new OpenLongFloatHashMap();
- /** Message list storage */
- private DoubleArrayList messageList = new DoubleArrayList();
@Override
- public void initialize(LongWritable id, DoubleWritable value,
- Map<LongWritable, FloatWritable> edges,
- Iterable<DoubleWritable> messages) {
- super.initialize(id, value);
+ public void setEdges(Map<LongWritable, FloatWritable> edges) {
if (edges != null) {
for (Map.Entry<LongWritable, FloatWritable> edge :
edges.entrySet()) {
- edgeMap.put(edge.getKey().get(),
- edge.getValue().get());
- }
- }
- if (messages != null) {
- for (DoubleWritable m : messages) {
- messageList.add(m.get());
+ edgeMap.put(edge.getKey().get(), edge.getValue().get());
}
}
}
@@ -144,17 +132,13 @@ public abstract class LongDoubleFloatDou
public final void readFields(DataInput in) throws IOException {
long id = in.readLong();
double value = in.readDouble();
- super.initialize(new LongWritable(id), new DoubleWritable(value));
+ initialize(new LongWritable(id), new DoubleWritable(value));
long edgeMapSize = in.readLong();
for (long i = 0; i < edgeMapSize; ++i) {
long targetVertexId = in.readLong();
float edgeValue = in.readFloat();
edgeMap.put(targetVertexId, edgeValue);
}
- long messageListSize = in.readLong();
- for (long i = 0; i < messageListSize; ++i) {
- messageList.add(in.readDouble());
- }
boolean halt = in.readBoolean();
if (halt) {
voteToHalt();
@@ -181,46 +165,9 @@ public abstract class LongDoubleFloatDou
return true;
}
});
- out.writeLong(messageList.size());
- messageList.forEach(new DoubleProcedure() {
- @Override
- public boolean apply(double message) {
- try {
- out.writeDouble(message);
- } catch (IOException e) {
- throw new IllegalStateException(
- "apply: IOException when not allowed", e);
- }
- return true;
- }
- });
out.writeBoolean(isHalted());
}
- @Override
- void putMessages(Iterable<DoubleWritable> messages) {
- messageList.clear();
- for (DoubleWritable message : messages) {
- messageList.add(message.get());
- }
- }
-
- @Override
- void releaseResources() {
- // Hint to GC to free the messages
- messageList.clear();
- }
-
- @Override
- public int getNumMessages() {
- return messageList.size();
- }
-
- @Override
- public Iterable<DoubleWritable> getMessages() {
- return new UnmodifiableDoubleWritableIterable(messageList);
- }
-
/**
* Helper iterable over the messages.
*/
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/LongDoubleNullDoubleVertex.java Thu Oct 11 16:48:45 2012
@@ -21,36 +21,28 @@ package org.apache.giraph.graph;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.Iterator;
import java.util.Map;
-import org.apache.giraph.utils.UnmodifiableDoubleArrayIterator;
import org.apache.giraph.utils.UnmodifiableLongNullEdgeArrayIterable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import com.google.common.collect.Iterables;
-
/**
* Compact vertex representation with primitive arrays and null edges.
*/
public abstract class LongDoubleNullDoubleVertex extends
Vertex<LongWritable, DoubleWritable, NullWritable, DoubleWritable> {
-
/** long represented vertex id */
private long id;
/** double represented vertex value */
private double value;
/** long array of neighbor vertex ids */
private long[] neighbors;
- /** double array of messages */
- private double[] messages;
@Override
public void initialize(LongWritable vertexId, DoubleWritable vertexValue,
- Map<LongWritable, NullWritable> edges,
- Iterable<DoubleWritable> messages) {
+ Map<LongWritable, NullWritable> edges) {
id = vertexId.get();
value = vertexValue.get();
neighbors = new long[(edges != null) ? edges.size() : 0];
@@ -60,14 +52,6 @@ public abstract class LongDoubleNullDoub
neighbors[n++] = neighbor.get();
}
}
- this.messages =
- new double[(messages != null) ? Iterables.size(messages) : 0];
- if (messages != null) {
- n = 0;
- for (DoubleWritable message : messages) {
- this.messages[n++] = message.get();
- }
- }
}
@Override
@@ -118,30 +102,6 @@ public abstract class LongDoubleNullDoub
}
@Override
- public Iterable<DoubleWritable> getMessages() {
- return new Iterable<DoubleWritable>() {
- @Override
- public Iterator<DoubleWritable> iterator() {
- return new UnmodifiableDoubleArrayIterator(messages);
- }
- };
- }
-
- @Override
- public void putMessages(Iterable<DoubleWritable> newMessages) {
- messages = new double[Iterables.size(newMessages)];
- int n = 0;
- for (DoubleWritable message : newMessages) {
- messages[n++] = message.get();
- }
- }
-
- @Override
- void releaseResources() {
- messages = new double[0];
- }
-
- @Override
public void write(final DataOutput out) throws IOException {
out.writeLong(id);
out.writeDouble(value);
@@ -149,10 +109,6 @@ public abstract class LongDoubleNullDoub
for (int n = 0; n < neighbors.length; n++) {
out.writeLong(neighbors[n]);
}
- out.writeInt(messages.length);
- for (int n = 0; n < messages.length; n++) {
- out.writeDouble(messages[n]);
- }
}
@Override
@@ -164,10 +120,5 @@ public abstract class LongDoubleNullDoub
for (int n = 0; n < numEdges; n++) {
neighbors[n] = in.readLong();
}
- int numMessages = in.readInt();
- messages = new double[numMessages];
- for (int n = 0; n < numMessages; n++) {
- messages[n] = in.readDouble();
- }
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/MutableVertex.java Thu Oct 11 16:48:45 2012
@@ -62,15 +62,14 @@ public abstract class MutableVertex<I ex
* @param vertexId Id of the new vertex.
* @param vertexValue Value of the new vertex.
* @param edges Map of edges to be added to this vertex.
- * @param messages Messages to be added to the vertex (typically empty)
* @return A new vertex for adding to the graph
*/
public Vertex<I, V, E, M> instantiateVertex(
- I vertexId, V vertexValue, Map<I, E> edges, Iterable<M> messages) {
+ I vertexId, V vertexValue, Map<I, E> edges) {
MutableVertex<I, V, E, M> mutableVertex =
(MutableVertex<I, V, E, M>) getConf().createVertex();
mutableVertex.setGraphState(getGraphState());
- mutableVertex.initialize(vertexId, vertexValue, edges, messages);
+ mutableVertex.initialize(vertexId, vertexValue, edges);
return mutableVertex;
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleMutableVertex.java Thu Oct 11 16:48:45 2012
@@ -28,10 +28,9 @@ import com.google.common.collect.Iterabl
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Mutable vertex with no edge values.
@@ -43,6 +42,18 @@ public abstract class SimpleMutableVerte
V extends Writable, M extends Writable> extends MutableVertex<I, V,
NullWritable, M> {
/**
+ * Set the neighbors of this vertex.
+ *
+ * @param neighbors Set of destination vertex ids.
+ */
+ public abstract void setNeighbors(Set<I> neighbors);
+
+ @Override
+ public void setEdges(Map<I, NullWritable> edges) {
+ setNeighbors(edges.keySet());
+ }
+
+ /**
* Get a read-only view of the neighbors of this
* vertex, i.e. the target vertices of its out-edges.
*
@@ -105,14 +116,7 @@ public abstract class SimpleMutableVerte
edges.put(targetVertexId, NullWritable.get());
}
- int numMessages = in.readInt();
- List<M> messages = new ArrayList<M>(numMessages);
- for (int i = 0; i < numMessages; ++i) {
- M message = (M) getConf().createMessageValue();
- message.readFields(in);
- messages.add(message);
- }
- initialize(vertexId, vertexValue, edges, messages);
+ initialize(vertexId, vertexValue, edges);
boolean halt = in.readBoolean();
if (halt) {
@@ -132,11 +136,6 @@ public abstract class SimpleMutableVerte
neighbor.write(out);
}
- out.writeInt(getNumMessages());
- for (M message : getMessages()) {
- message.write(out);
- }
-
out.writeBoolean(isHalted());
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/SimpleVertex.java Thu Oct 11 16:48:45 2012
@@ -28,10 +28,9 @@ import com.google.common.collect.Iterabl
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
@@ -45,6 +44,18 @@ public abstract class SimpleVertex<I ext
V extends Writable, M extends Writable> extends Vertex<I, V,
NullWritable, M> {
/**
+ * Set the neighbors of this vertex.
+ *
+ * @param neighbors Set of destination vertex ids.
+ */
+ public abstract void setNeighbors(Set<I> neighbors);
+
+ @Override
+ public void setEdges(Map<I, NullWritable> edges) {
+ setNeighbors(edges.keySet());
+ }
+
+ /**
* Get a read-only view of the neighbors of this
* vertex, i.e. the target vertices of its out-edges.
*
@@ -84,14 +95,7 @@ public abstract class SimpleVertex<I ext
edges.put(targetVertexId, NullWritable.get());
}
- int numMessages = in.readInt();
- List<M> messages = new ArrayList<M>(numMessages);
- for (int i = 0; i < numMessages; ++i) {
- M message = getConf().createMessageValue();
- message.readFields(in);
- messages.add(message);
- }
- initialize(vertexId, vertexValue, edges, messages);
+ initialize(vertexId, vertexValue, edges);
boolean halt = in.readBoolean();
if (halt) {
@@ -111,11 +115,6 @@ public abstract class SimpleVertex<I ext
neighbor.write(out);
}
- out.writeInt(getNumMessages());
- for (M message : getMessages()) {
- message.write(out);
- }
-
out.writeBoolean(isHalted());
}
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/Vertex.java Thu Oct 11 16:48:45 2012
@@ -25,13 +25,12 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapreduce.Mapper;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
/**
@@ -58,7 +57,6 @@ public abstract class Vertex<I extends W
/** Configuration */
private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-
/**
* This method must be called after instantiation of a vertex
* with ImmutableClassesGiraphConfiguration
@@ -67,15 +65,17 @@ public abstract class Vertex<I extends W
*
* @param id Will be the vertex id
* @param value Will be the vertex value
- * @param edges A map of destination edge ids to edge values (can be null)
- * @param messages Initial messages for this vertex (can be null)
+ * @param edges A map of destination vertex ids to edge values
*/
- public abstract void initialize(
- I id, V value, Map<I, E> edges, Iterable<M> messages);
+ public void initialize(I id, V value, Map<I, E> edges) {
+ this.id = id;
+ this.value = value;
+ setEdges(edges);
+ }
/**
- * This method must be called once by the subclass's initialize() or by
- * readFields() in order to set id and value.
+ * This method only sets id and value. Can be used by Vertex
+ * implementations in readFields().
*
* @param id Vertex id
* @param value Vertex value
@@ -83,9 +83,17 @@ public abstract class Vertex<I extends W
public void initialize(I id, V value) {
this.id = id;
this.value = value;
+ setEdges(Maps.<I, E>newHashMapWithExpectedSize(0));
}
/**
+ * Set the outgoing edges for this vertex.
+ *
+ * @param edges Map of destination vertices to edge values
+ */
+ public abstract void setEdges(Map<I, E> edges);
+
+ /**
* Must be defined by user to do computation on a single Vertex.
*
* @param messages Messages that were sent to this vertex in the previous
@@ -253,35 +261,6 @@ public abstract class Vertex<I extends W
}
/**
- * Get the list of incoming messages from the previous superstep. Same as
- * the message iterator passed to compute().
- *
- * @return Messages received.
- */
- public abstract Iterable<M> getMessages();
-
- /**
- * Get the number of messages from the previous superstep.
- * @return Number of messages received.
- */
- public int getNumMessages() {
- return Iterables.size(getMessages());
- }
-
- /**
- * Copy the messages this vertex should process in the current superstep
- *
- * @param messages the messages sent to this vertex in the previous superstep
- */
- abstract void putMessages(Iterable<M> messages);
-
- /**
- * Release unnecessary resources (will be called after vertex returns from
- * {@link #compute(Iterable)})
- */
- abstract void releaseResources();
-
- /**
* Get the graph state for all workers.
*
* @return Graph state for all workers
@@ -295,7 +274,7 @@ public abstract class Vertex<I extends W
*
* @param graphState Graph state for all workers
*/
- public void setGraphState(GraphState<I, V, E, M> graphState) {
+ void setGraphState(GraphState<I, V, E, M> graphState) {
this.graphState = graphState;
}
@@ -346,14 +325,7 @@ public abstract class Vertex<I extends W
edges.put(targetVertexId, edgeValue);
}
- int numMessages = in.readInt();
- List<M> messages = new ArrayList<M>(numMessages);
- for (int i = 0; i < numMessages; ++i) {
- M message = (M) getConf().createMessageValue();
- message.readFields(in);
- messages.add(message);
- }
- initialize(vertexId, vertexValue, edges, messages);
+ initialize(vertexId, vertexValue, edges);
halt = in.readBoolean();
}
@@ -369,11 +341,6 @@ public abstract class Vertex<I extends W
edge.getValue().write(out);
}
- out.writeInt(getNumMessages());
- for (M message : getMessages()) {
- message.write(out);
- }
-
out.writeBoolean(halt);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Thu Oct 11 16:48:45 2012
@@ -88,7 +88,6 @@ public class VertexResolver<I extends Wr
vertex = instantiateVertex();
vertex.initialize(vertexId,
getConf().createVertexValue(),
- null,
null);
}
} else {
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java Thu Oct 11 16:48:45 2012
@@ -33,7 +33,7 @@ public class WorkerInfo implements Writa
private String hostname;
/** Task Partition (Worker) ID of this worker */
private int taskId = -1;
- /** Port that the RPC server is using */
+ /** Port that the IPC server is using */
private int port = -1;
/** Hostname + "_" + id for easier debugging */
private String hostnameId;
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/PseudoRandomVertexInputFormat.java Thu Oct 11 16:48:45 2012
@@ -170,7 +170,7 @@ public class PseudoRandomVertexInputForm
} while (edges.containsKey(destVertexId));
edges.put(destVertexId, new DoubleWritable(rand.nextDouble()));
}
- vertex.initialize(new LongWritable(vertexId), vertexValue, edges, null);
+ vertex.initialize(new LongWritable(vertexId), vertexValue, edges);
++verticesRead;
if (LOG.isTraceEnabled()) {
LOG.trace("next: Return vertexId=" +
Modified: giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/io/TextVertexInputFormat.java Thu Oct 11 16:48:45 2012
@@ -178,7 +178,7 @@ public abstract class TextVertexInputFor
InterruptedException {
Text line = getRecordReader().getCurrentValue();
Vertex<I, V, E, M> vertex = getConf().createVertex();
- vertex.initialize(getId(line), getValue(line), getEdges(line), null);
+ vertex.initialize(getId(line), getValue(line), getEdges(line));
return vertex;
}
@@ -248,7 +248,7 @@ public abstract class TextVertexInputFor
T processed = preprocessLine(line);
vertex = getConf().createVertex();
vertex.initialize(getId(processed), getValue(processed),
- getEdges(processed), null);
+ getEdges(processed));
return vertex;
}
@@ -335,7 +335,7 @@ public abstract class TextVertexInputFor
Configuration conf = getContext().getConfiguration();
vertex = getConf().createVertex();
vertex.initialize(getId(processed), getValue(processed),
- getEdges(processed), null);
+ getEdges(processed));
} catch (IOException e) {
throw e;
// CHECKSTYLE: stop IllegalCatch
Modified: giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java Thu Oct 11 16:48:45 2012
@@ -117,7 +117,7 @@ public class RequestTest {
IntWritable, IntWritable>>();
for (int i = 0; i < 10; ++i) {
TestVertex vertex = new TestVertex();
- vertex.initialize(new IntWritable(i), new IntWritable(i), null, null);
+ vertex.initialize(new IntWritable(i), new IntWritable(i));
vertices.add(vertex);
}
@@ -210,7 +210,7 @@ public class RequestTest {
IntWritable, IntWritable>();
for (int j = 0; j < 3; ++j) {
TestVertex vertex = new TestVertex();
- vertex.initialize(new IntWritable(i), new IntWritable(j), null, null);
+ vertex.initialize(new IntWritable(i), new IntWritable(j));
mutations.addVertex(vertex);
}
for (int j = 0; j < 2; ++j) {
Modified: giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathsVertexTest.java Thu Oct 11 16:48:45 2012
@@ -52,7 +52,7 @@ public class SimpleShortestPathsVertexTe
public void testOnShorterPathFound() throws Exception {
SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
- vertex.initialize(null, null, null, null);
+ vertex.initialize(null, null);
vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
@@ -83,7 +83,7 @@ public class SimpleShortestPathsVertexTe
public void testOnNoShorterPathFound() throws Exception {
SimpleShortestPathsVertex vertex = new SimpleShortestPathsVertex();
- vertex.initialize(null, null, null, null);
+ vertex.initialize(null, null);
vertex.addEdge(new LongWritable(10L), new FloatWritable(2.5f));
vertex.addEdge(new LongWritable(20L), new FloatWritable(0.5f));
Modified: giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java?rev=1397159&r1=1397158&r2=1397159&view=diff
==============================================================================
--- giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java (original)
+++ giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleTriangleClosingVertexTest.java Thu Oct 11 16:48:45 2012
@@ -46,7 +46,7 @@ public class SimpleTriangleClosingVertex
new SimpleTriangleClosingVertex();
SimpleTriangleClosingVertex.IntArrayListWritable alw =
new SimpleTriangleClosingVertex.IntArrayListWritable();
- vertex.initialize(null, null, null, null);
+ vertex.initialize(null, null);
vertex.addEdge(new IntWritable(5), NullWritable.get());
vertex.addEdge(new IntWritable(7), NullWritable.get());
@@ -72,7 +72,7 @@ public class SimpleTriangleClosingVertex
// messages properly to verify the algorithm
SimpleTriangleClosingVertex vertex =
new SimpleTriangleClosingVertex();
- vertex.initialize(null, null, null, null);
+ vertex.initialize(null, null);
MockUtils.MockedEnvironment<IntWritable,
SimpleTriangleClosingVertex.IntArrayListWritable,
NullWritable, IntWritable>