You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/07 21:56:52 UTC
svn commit: r1370476 - in /giraph/trunk: ./
src/main/java/org/apache/giraph/benchmark/
src/main/java/org/apache/giraph/comm/
Author: aching
Date: Tue Aug 7 19:56:51 2012
New Revision: 1370476
URL: http://svn.apache.org/viewvc?rev=1370476&view=rev
Log:
GIRAPH-262: Netty optimization to handle requests locally whenever
possible. (aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/pom.xml
giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug 7 19:56:51 2012
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-262: Netty optimization to handle requests locally whenever
+ possible. (aching)
+
GIRAPH-288: Bandwidth tracking - subset of GIRAPH-262. (aching)
GIRAPH-289: Add thread and channel pooling to NettyClient and
Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Tue Aug 7 19:56:51 2012
@@ -428,7 +428,7 @@ under the License.
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
- <version>0.7</version>
+ <version>0.8</version>
<executions>
<execution>
<phase>verify</phase>
Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java Tue Aug 7 19:56:51 2012
@@ -159,7 +159,8 @@ public class PseudoRandomVertexInputForm
// same.
Random rand = new Random(vertexId);
DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
- Map<LongWritable, DoubleWritable> edges = Maps.newHashMap();
+ Map<LongWritable, DoubleWritable> edges =
+ Maps.newHashMapWithExpectedSize((int) edgesPerVertex);
for (long i = 0; i < edgesPerVertex; ++i) {
LongWritable destVertexId = null;
do {
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Tue Aug 7 19:56:51 2012
@@ -86,15 +86,19 @@ public class NettyWorkerClient<I extends
private final int maxMutationsPerPartition;
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
+ /** Server data from the server */
+ private final ServerData<I, V, E, M> serverData;
/**
* Only constructor.
*
* @param context Context from mapper
* @param service Used to get partition mapping
+ * @param serverData Server data (used for local requests)
*/
public NettyWorkerClient(Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E, M> service) {
+ CentralizedServiceWorker<I, V, E, M> service,
+ ServerData<I, V, E, M> serverData) {
this.nettyClient = new NettyClient<I, V, E, M>(context);
this.conf = context.getConfiguration();
this.service = service;
@@ -104,6 +108,7 @@ public class NettyWorkerClient<I extends
GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
sendMessageCache = new SendMessageCache<I, M>(conf);
sendMutationsCache = new SendMutationsCache<I, V, E, M>();
+ this.serverData = serverData;
}
@Override
@@ -167,6 +172,23 @@ public class NettyWorkerClient<I extends
partitionOwner.getPartitionId());
}
+ /**
+ * When doing the request, short circuit if it is local
+ *
+ * @param remoteServerAddress Remote server address (checked against local)
+ * @param writableRequest Request to either submit or run locally
+ */
+ private void doRequest(InetSocketAddress remoteServerAddress,
+ WritableRequest<I, V, E, M> writableRequest) {
+ // If this is local, execute locally
+ if (service.getWorkerInfo().getHostnamePort().equals(
+ remoteServerAddress)) {
+ writableRequest.doRequest(serverData);
+ } else {
+ nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+ }
+ }
+
@Override
public void sendMessageRequest(I destVertexId, M message) {
PartitionOwner partitionOwner =
@@ -191,7 +213,7 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMessagesRequest<I, V, E, M>(
partitionId, partitionMessages);
- nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+ doRequest(remoteServerAddress, writableRequest);
}
}
@@ -209,7 +231,7 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> vertexRequest =
new SendVertexRequest<I, V, E, M>(partitionId,
partition.getVertices());
- nettyClient.sendWritableRequest(remoteServerAddress, vertexRequest);
+ doRequest(remoteServerAddress, vertexRequest);
// messages are stored separately
MessageStoreByPartition<I, M> messageStore =
@@ -229,7 +251,7 @@ public class NettyWorkerClient<I extends
if (messagesInMap > maxMessagesPerPartition) {
WritableRequest<I, V, E, M> messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- nettyClient.sendWritableRequest(remoteServerAddress, messagesRequest);
+ doRequest(remoteServerAddress, messagesRequest);
map.clear();
messagesInMap = 0;
}
@@ -237,18 +259,18 @@ public class NettyWorkerClient<I extends
if (!map.isEmpty()) {
WritableRequest<I, V, E, M> messagesRequest = new
SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
- nettyClient.sendWritableRequest(remoteServerAddress, messagesRequest);
+ doRequest(remoteServerAddress, messagesRequest);
}
}
- /**
- * Send a mutations request if the maximum number of mutations per partition
- * was met.
- *
- * @param partitionId Partition id
- * @param partitionOwner Owner of the partition
- * @param partitionMutationCount Number of mutations for this partition
- */
+ /**
+ * Send a mutations request if the maximum number of mutations per partition
+ * was met.
+ *
+ * @param partitionId Partition id
+ * @param partitionOwner Owner of the partition
+ * @param partitionMutationCount Number of mutations for this partition
+ */
private void sendMutationsRequestIfFull(
int partitionId, PartitionOwner partitionOwner,
int partitionMutationCount) {
@@ -261,7 +283,7 @@ public class NettyWorkerClient<I extends
WritableRequest<I, V, E, M> writableRequest =
new SendPartitionMutationsRequest<I, V, E, M>(
partitionId, partitionMutations);
- nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+ doRequest(remoteServerAddress, writableRequest);
}
}
@@ -352,7 +374,7 @@ public class NettyWorkerClient<I extends
entry.getKey(), entry.getValue());
InetSocketAddress remoteServerAddress =
getInetSocketAddress(entry.getValue().keySet().iterator().next());
- nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+ doRequest(remoteServerAddress, writableRequest);
}
// Execute the remaining sends mutations (if any)
@@ -365,7 +387,7 @@ public class NettyWorkerClient<I extends
entry.getKey(), entry.getValue());
InetSocketAddress remoteServerAddress =
getInetSocketAddress(entry.getValue().keySet().iterator().next());
- nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+ doRequest(remoteServerAddress, writableRequest);
}
nettyClient.waitAllRequests();
Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java Tue Aug 7 19:56:51 2012
@@ -56,9 +56,10 @@ public class NettyWorkerClientServer<I e
*/
public NettyWorkerClientServer(Mapper<?, ?, ?, ?>.Context context,
CentralizedServiceWorker<I, V, E, M> service) {
- client = new NettyWorkerClient<I, V, E, M>(context, service);
server = new NettyWorkerServer<I, V, E, M>(context.getConfiguration(),
- service);
+ service);
+ client = new NettyWorkerClient<I, V, E, M>(context, service,
+ ((NettyWorkerServer<I, V, E, M>) server).getServerData());
}
@Override