You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/08/07 21:56:52 UTC

svn commit: r1370476 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/comm/

Author: aching
Date: Tue Aug  7 19:56:51 2012
New Revision: 1370476

URL: http://svn.apache.org/viewvc?rev=1370476&view=rev
Log:
GIRAPH-262: Netty optimization to handle requests locally whenever
possible. (aching)

Modified:
    giraph/trunk/CHANGELOG
    giraph/trunk/pom.xml
    giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
    giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java

Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Tue Aug  7 19:56:51 2012
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.2.0 - unreleased
 
+  GIRAPH-262: Netty optimization to handle requests locally whenever
+  possible. (aching)
+
   GIRAPH-288: Bandwidth tracking - subset of GIRAPH-262. (aching)
 
   GIRAPH-289: Add thread and channel pooling to NettyClient and

Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Tue Aug  7 19:56:51 2012
@@ -428,7 +428,7 @@ under the License.
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
-        <version>0.7</version>
+        <version>0.8</version>
         <executions>
           <execution>
             <phase>verify</phase>

Modified: giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/benchmark/PseudoRandomVertexInputFormat.java Tue Aug  7 19:56:51 2012
@@ -159,7 +159,8 @@ public class PseudoRandomVertexInputForm
       // same.
       Random rand = new Random(vertexId);
       DoubleWritable vertexValue = new DoubleWritable(rand.nextDouble());
-      Map<LongWritable, DoubleWritable> edges = Maps.newHashMap();
+      Map<LongWritable, DoubleWritable> edges =
+          Maps.newHashMapWithExpectedSize((int) edgesPerVertex);
       for (long i = 0; i < edgesPerVertex; ++i) {
         LongWritable destVertexId = null;
         do {

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java Tue Aug  7 19:56:51 2012
@@ -86,15 +86,19 @@ public class NettyWorkerClient<I extends
   private final int maxMutationsPerPartition;
   /** Messages sent during the last superstep */
   private long totalMsgsSentInSuperstep = 0;
+  /** Server data from the server */
+  private final ServerData<I, V, E, M> serverData;
 
   /**
    * Only constructor.
    *
    * @param context Context from mapper
    * @param service Used to get partition mapping
+   * @param serverData Server data (used for local requests)
    */
   public NettyWorkerClient(Mapper<?, ?, ?, ?>.Context context,
-                           CentralizedServiceWorker<I, V, E, M> service) {
+                           CentralizedServiceWorker<I, V, E, M> service,
+                           ServerData<I, V, E, M> serverData) {
     this.nettyClient = new NettyClient<I, V, E, M>(context);
     this.conf = context.getConfiguration();
     this.service = service;
@@ -104,6 +108,7 @@ public class NettyWorkerClient<I extends
         GiraphJob.MAX_MUTATIONS_PER_REQUEST_DEFAULT);
     sendMessageCache = new SendMessageCache<I, M>(conf);
     sendMutationsCache = new SendMutationsCache<I, V, E, M>();
+    this.serverData = serverData;
   }
 
   @Override
@@ -167,6 +172,23 @@ public class NettyWorkerClient<I extends
         partitionOwner.getPartitionId());
   }
 
+  /**
+   * When doing the request, short circuit if it is local
+   *
+   * @param remoteServerAddress Remote server address (checked against local)
+   * @param writableRequest Request to either submit or run locally
+   */
+  private void doRequest(InetSocketAddress remoteServerAddress,
+                         WritableRequest<I, V, E, M> writableRequest) {
+    // If this is local, execute locally
+    if (service.getWorkerInfo().getHostnamePort().equals(
+        remoteServerAddress)) {
+      writableRequest.doRequest(serverData);
+    } else {
+      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+    }
+  }
+
   @Override
   public void sendMessageRequest(I destVertexId, M message) {
     PartitionOwner partitionOwner =
@@ -191,7 +213,7 @@ public class NettyWorkerClient<I extends
       WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMessagesRequest<I, V, E, M>(
               partitionId, partitionMessages);
-      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+      doRequest(remoteServerAddress, writableRequest);
     }
   }
 
@@ -209,7 +231,7 @@ public class NettyWorkerClient<I extends
     WritableRequest<I, V, E, M> vertexRequest =
         new SendVertexRequest<I, V, E, M>(partitionId,
             partition.getVertices());
-    nettyClient.sendWritableRequest(remoteServerAddress, vertexRequest);
+    doRequest(remoteServerAddress, vertexRequest);
 
     // messages are stored separately
     MessageStoreByPartition<I, M> messageStore =
@@ -229,7 +251,7 @@ public class NettyWorkerClient<I extends
       if (messagesInMap > maxMessagesPerPartition) {
         WritableRequest<I, V, E, M> messagesRequest = new
             SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-        nettyClient.sendWritableRequest(remoteServerAddress, messagesRequest);
+        doRequest(remoteServerAddress, messagesRequest);
         map.clear();
         messagesInMap = 0;
       }
@@ -237,18 +259,18 @@ public class NettyWorkerClient<I extends
     if (!map.isEmpty()) {
       WritableRequest<I, V, E, M> messagesRequest = new
           SendPartitionCurrentMessagesRequest<I, V, E, M>(partitionId, map);
-      nettyClient.sendWritableRequest(remoteServerAddress, messagesRequest);
+      doRequest(remoteServerAddress, messagesRequest);
     }
   }
 
-  /**
-   * Send a mutations request if the maximum number of mutations per partition
-   * was met.
-   *
-   * @param partitionId Partition id
-   * @param partitionOwner Owner of the partition
-   * @param partitionMutationCount Number of mutations for this partition
-   */
+    /**
+    * Send a mutations request if the maximum number of mutations per partition
+    * was met.
+    *
+    * @param partitionId Partition id
+    * @param partitionOwner Owner of the partition
+    * @param partitionMutationCount Number of mutations for this partition
+    */
   private void sendMutationsRequestIfFull(
       int partitionId, PartitionOwner partitionOwner,
       int partitionMutationCount) {
@@ -261,7 +283,7 @@ public class NettyWorkerClient<I extends
       WritableRequest<I, V, E, M> writableRequest =
           new SendPartitionMutationsRequest<I, V, E, M>(
               partitionId, partitionMutations);
-      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+      doRequest(remoteServerAddress, writableRequest);
     }
   }
 
@@ -352,7 +374,7 @@ public class NettyWorkerClient<I extends
               entry.getKey(), entry.getValue());
       InetSocketAddress remoteServerAddress =
           getInetSocketAddress(entry.getValue().keySet().iterator().next());
-      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+      doRequest(remoteServerAddress, writableRequest);
     }
 
     // Execute the remaining sends mutations (if any)
@@ -365,7 +387,7 @@ public class NettyWorkerClient<I extends
               entry.getKey(), entry.getValue());
       InetSocketAddress remoteServerAddress =
           getInetSocketAddress(entry.getValue().keySet().iterator().next());
-      nettyClient.sendWritableRequest(remoteServerAddress, writableRequest);
+      doRequest(remoteServerAddress, writableRequest);
     }
 
     nettyClient.waitAllRequests();

Modified: giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java?rev=1370476&r1=1370475&r2=1370476&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java Tue Aug  7 19:56:51 2012
@@ -56,9 +56,10 @@ public class NettyWorkerClientServer<I e
    */
   public NettyWorkerClientServer(Mapper<?, ?, ?, ?>.Context context,
       CentralizedServiceWorker<I, V, E, M> service) {
-    client = new NettyWorkerClient<I, V, E, M>(context, service);
     server = new NettyWorkerServer<I, V, E, M>(context.getConfiguration(),
-                                               service);
+        service);
+    client = new NettyWorkerClient<I, V, E, M>(context, service,
+       ((NettyWorkerServer<I, V, E, M>) server).getServerData());
   }
 
   @Override