You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by Avery Ching <av...@gmail.com> on 2012/05/09 11:22:36 UTC

Review Request: Implemented a netty client/server protocol a a faster alternative to Hadoop RPC (3x improvement)

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/5074/
-----------------------------------------------------------

Review request for giraph.


Summary
-------

* Implemented a request/response protocol with netty as a NettyClient and NettyServer.  There is a NettyClientWorker and NettyClientServer that implements WorkerClient and WorkerServer, respectively.  Netty is a lot faster since it's non-blocking and we can interleave computation and communication as opposed to Hadoop RPC (blocking).
* The netty server implementation uses concurrent hash maps to improved concurrency instead of synchronized blocks around maps.
* By default netty is used, but Hadoop RPC can be used with -Dgiraph.useNetty=false
* Changed the class hierarchy of ServerInterface to WorkerClientServer (WorkerClient and WorkerServer) to support a request/response protocol instead of just RPC
* In netty, the messages/mutations are gathered by partition and send out as a partition's worth of messages/mutations
* Added two new test classes (RequestTest.java and ConnectionTest.java) to test all requests and check netty connections.
* PageRankBenchmark uses EdgeListVertex as a default


This addresses bug GIRAPH-37.
    https://issues.apache.org/jira/browse/GIRAPH-37


Diffs
-----

  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMutationsCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestEncoder.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java PRE-CREATION 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/pom.xml 1332888 
  http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java 1332888 

Diff: https://reviews.apache.org/r/5074/diff


Testing
-------

'mvn verify' passes.  I ran several test runs to gather performance results.  Here is a simple example:

Hadoop RPC:
hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=false -w 5 -V 5000000 -s 5 -e 2 -v

12/05/09 01:59:56 INFO mapred.JobClient:   Giraph Timers
12/05/09 01:59:56 INFO mapred.JobClient:     Total (milliseconds)=167722
12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 3 (milliseconds)=24775
12/05/09 01:59:56 INFO mapred.JobClient:     Setup (milliseconds)=2930
12/05/09 01:59:56 INFO mapred.JobClient:     Shutdown (milliseconds)=181
12/05/09 01:59:56 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=51025
12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 0 (milliseconds)=21543
12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 4 (milliseconds)=19858
12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 5 (milliseconds)=2844
12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 2 (milliseconds)=24507
12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 1 (milliseconds)=20054

Netty:
hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=true -w 5 -V 5000000 -s 5 -e 2 -v

12/05/09 02:06:10 INFO mapred.JobClient:   Giraph Timers
12/05/09 02:06:10 INFO mapred.JobClient:     Total (milliseconds)=57795
12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 3 (milliseconds)=7636
12/05/09 02:06:10 INFO mapred.JobClient:     Setup (milliseconds)=3574
12/05/09 02:06:10 INFO mapred.JobClient:     Shutdown (milliseconds)=232
12/05/09 02:06:10 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=13393
12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 0 (milliseconds)=5610
12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 4 (milliseconds)=8473
12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 5 (milliseconds)=1844
12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 2 (milliseconds)=7418
12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 1 (milliseconds)=9612

These were some median runs. The overall runtime improved from 167722 -> 57795 with Netty (2.9x faster).  Loading the vertices improved from 51025 -> 13393 (3.8x faster).  More results coming tomorrow, but for bigger runs, the improvement is likely to be even more than 3x.


Thanks,

Avery


Re: Review Request: Implemented a netty client/server protocol a a faster alternative to Hadoop RPC (3x improvement)

Posted by Sebastian Schelter <ss...@apache.org>.
I agree that we should try to not give out the map, there should a
method to get a single entry, a method to get an Iterable for all
entries and a method to clear it.

--sebastian

On 09.05.2012 18:28, Avery Ching wrote:
> 
> 
>> On 2012-05-09 10:10:46, Sebastian Schelter wrote:
>>> http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java, line 1465
>>> <https://reviews.apache.org/r/5074/diff/2/?file=108120#file108120line1465>
>>>
>>>     I don't like it that a collection is changed outside of the class that owns it. 
>>>     
>>>     This makes code hard to read and debug. We should rather introduce a method for this in the class that owns this map to have all mutations in one place.
> 
> Good point, it's a little heard to understand.  Since this is a Map, we can do as you suggested, keep it in a class and then add a method to do the clear().  We can even add calls to do the methods that iterate over the map as well to not have to do any synchronization outside of the map.  I'll do this for all our synchronized objects in the next patch if that's okay with you (the current code does this as well).  It will be a somewhat medium sized change.
> 
> 
> - Avery
> 
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/5074/#review7728
> -----------------------------------------------------------
> 
> 
> On 2012-05-09 09:22:36, Avery Ching wrote:
>>
>> -----------------------------------------------------------
>> This is an automatically generated e-mail. To reply, visit:
>> https://reviews.apache.org/r/5074/
>> -----------------------------------------------------------
>>
>> (Updated 2012-05-09 09:22:36)
>>
>>
>> Review request for giraph.
>>
>>
>> Summary
>> -------
>>
>> * Implemented a request/response protocol with netty as a NettyClient and NettyServer.  There is a NettyClientWorker and NettyClientServer that implements WorkerClient and WorkerServer, respectively.  Netty is a lot faster since it's non-blocking and we can interleave computation and communication as opposed to Hadoop RPC (blocking).
>> * The netty server implementation uses concurrent hash maps to improved concurrency instead of synchronized blocks around maps.
>> * By default netty is used, but Hadoop RPC can be used with -Dgiraph.useNetty=false
>> * Changed the class hierarchy of ServerInterface to WorkerClientServer (WorkerClient and WorkerServer) to support a request/response protocol instead of just RPC
>> * In netty, the messages/mutations are gathered by partition and send out as a partition's worth of messages/mutations
>> * Added two new test classes (RequestTest.java and ConnectionTest.java) to test all requests and check netty connections.
>> * PageRankBenchmark uses EdgeListVertex as a default
>>
>>
>> This addresses bug GIRAPH-37.
>>     https://issues.apache.org/jira/browse/GIRAPH-37
>>
>>
>> Diffs
>> -----
>>
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMutationsCache.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestEncoder.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java PRE-CREATION 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/pom.xml 1332888 
>>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java 1332888 
>>
>> Diff: https://reviews.apache.org/r/5074/diff
>>
>>
>> Testing
>> -------
>>
>> 'mvn verify' passes.  I ran several test runs to gather performance results.  Here is a simple example:
>>
>> Hadoop RPC:
>> hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=false -w 5 -V 5000000 -s 5 -e 2 -v
>>
>> 12/05/09 01:59:56 INFO mapred.JobClient:   Giraph Timers
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Total (milliseconds)=167722
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 3 (milliseconds)=24775
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Setup (milliseconds)=2930
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Shutdown (milliseconds)=181
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=51025
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 0 (milliseconds)=21543
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 4 (milliseconds)=19858
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 5 (milliseconds)=2844
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 2 (milliseconds)=24507
>> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 1 (milliseconds)=20054
>>
>> Netty:
>> hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=true -w 5 -V 5000000 -s 5 -e 2 -v
>>
>> 12/05/09 02:06:10 INFO mapred.JobClient:   Giraph Timers
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Total (milliseconds)=57795
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 3 (milliseconds)=7636
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Setup (milliseconds)=3574
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Shutdown (milliseconds)=232
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=13393
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 0 (milliseconds)=5610
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 4 (milliseconds)=8473
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 5 (milliseconds)=1844
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 2 (milliseconds)=7418
>> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 1 (milliseconds)=9612
>>
>> These were some median runs. The overall runtime improved from 167722 -> 57795 with Netty (2.9x faster).  Loading the vertices improved from 51025 -> 13393 (3.8x faster).  More results coming tomorrow, but for bigger runs, the improvement is likely to be even more than 3x.
>>
>>
>> Thanks,
>>
>> Avery
>>
>>
> 
> 


Re: Review Request: Implemented a netty client/server protocol a a faster alternative to Hadoop RPC (3x improvement)

Posted by Avery Ching <av...@gmail.com>.

> On 2012-05-09 10:10:46, Sebastian Schelter wrote:
> > http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java, line 1465
> > <https://reviews.apache.org/r/5074/diff/2/?file=108120#file108120line1465>
> >
> >     I don't like it that a collection is changed outside of the class that owns it. 
> >     
> >     This makes code hard to read and debug. We should rather introduce a method for this in the class that owns this map to have all mutations in one place.

Good point, it's a little heard to understand.  Since this is a Map, we can do as you suggested, keep it in a class and then add a method to do the clear().  We can even add calls to do the methods that iterate over the map as well to not have to do any synchronization outside of the map.  I'll do this for all our synchronized objects in the next patch if that's okay with you (the current code does this as well).  It will be a somewhat medium sized change.


- Avery


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/5074/#review7728
-----------------------------------------------------------


On 2012-05-09 09:22:36, Avery Ching wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/5074/
> -----------------------------------------------------------
> 
> (Updated 2012-05-09 09:22:36)
> 
> 
> Review request for giraph.
> 
> 
> Summary
> -------
> 
> * Implemented a request/response protocol with netty as a NettyClient and NettyServer.  There is a NettyClientWorker and NettyClientServer that implements WorkerClient and WorkerServer, respectively.  Netty is a lot faster since it's non-blocking and we can interleave computation and communication as opposed to Hadoop RPC (blocking).
> * The netty server implementation uses concurrent hash maps to improved concurrency instead of synchronized blocks around maps.
> * By default netty is used, but Hadoop RPC can be used with -Dgiraph.useNetty=false
> * Changed the class hierarchy of ServerInterface to WorkerClientServer (WorkerClient and WorkerServer) to support a request/response protocol instead of just RPC
> * In netty, the messages/mutations are gathered by partition and send out as a partition's worth of messages/mutations
> * Added two new test classes (RequestTest.java and ConnectionTest.java) to test all requests and check netty connections.
> * PageRankBenchmark uses EdgeListVertex as a default
> 
> 
> This addresses bug GIRAPH-37.
>     https://issues.apache.org/jira/browse/GIRAPH-37
> 
> 
> Diffs
> -----
> 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMutationsCache.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestEncoder.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/pom.xml 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java 1332888 
> 
> Diff: https://reviews.apache.org/r/5074/diff
> 
> 
> Testing
> -------
> 
> 'mvn verify' passes.  I ran several test runs to gather performance results.  Here is a simple example:
> 
> Hadoop RPC:
> hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=false -w 5 -V 5000000 -s 5 -e 2 -v
> 
> 12/05/09 01:59:56 INFO mapred.JobClient:   Giraph Timers
> 12/05/09 01:59:56 INFO mapred.JobClient:     Total (milliseconds)=167722
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 3 (milliseconds)=24775
> 12/05/09 01:59:56 INFO mapred.JobClient:     Setup (milliseconds)=2930
> 12/05/09 01:59:56 INFO mapred.JobClient:     Shutdown (milliseconds)=181
> 12/05/09 01:59:56 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=51025
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 0 (milliseconds)=21543
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 4 (milliseconds)=19858
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 5 (milliseconds)=2844
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 2 (milliseconds)=24507
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 1 (milliseconds)=20054
> 
> Netty:
> hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=true -w 5 -V 5000000 -s 5 -e 2 -v
> 
> 12/05/09 02:06:10 INFO mapred.JobClient:   Giraph Timers
> 12/05/09 02:06:10 INFO mapred.JobClient:     Total (milliseconds)=57795
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 3 (milliseconds)=7636
> 12/05/09 02:06:10 INFO mapred.JobClient:     Setup (milliseconds)=3574
> 12/05/09 02:06:10 INFO mapred.JobClient:     Shutdown (milliseconds)=232
> 12/05/09 02:06:10 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=13393
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 0 (milliseconds)=5610
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 4 (milliseconds)=8473
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 5 (milliseconds)=1844
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 2 (milliseconds)=7418
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 1 (milliseconds)=9612
> 
> These were some median runs. The overall runtime improved from 167722 -> 57795 with Netty (2.9x faster).  Loading the vertices improved from 51025 -> 13393 (3.8x faster).  More results coming tomorrow, but for bigger runs, the improvement is likely to be even more than 3x.
> 
> 
> Thanks,
> 
> Avery
> 
>


Re: Review Request: Implemented a netty client/server protocol a a faster alternative to Hadoop RPC (3x improvement)

Posted by Sebastian Schelter <ss...@apache.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/5074/#review7728
-----------------------------------------------------------

Ship it!


I went through the code (although I don't have much experience with networking code), everything looks very well.

I tested this patch by computing the connected components of the undirected wikipedia pagelink graph (6M vertices, 250M edges) on a 6 machine cluster. Everything went fine and I even saw a small improvement in runtime although the job only takes 4 minutes.




http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
<https://reviews.apache.org/r/5074/#comment17027>

    I don't like it that a collection is changed outside of the class that owns it. 
    
    This makes code hard to read and debug. We should rather introduce a method for this in the class that owns this map to have all mutations in one place.


- Sebastian


On 2012-05-09 09:22:36, Avery Ching wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/5074/
> -----------------------------------------------------------
> 
> (Updated 2012-05-09 09:22:36)
> 
> 
> Review request for giraph.
> 
> 
> Summary
> -------
> 
> * Implemented a request/response protocol with netty as a NettyClient and NettyServer.  There is a NettyClientWorker and NettyClientServer that implements WorkerClient and WorkerServer, respectively.  Netty is a lot faster since it's non-blocking and we can interleave computation and communication as opposed to Hadoop RPC (blocking).
> * The netty server implementation uses concurrent hash maps to improved concurrency instead of synchronized blocks around maps.
> * By default netty is used, but Hadoop RPC can be used with -Dgiraph.useNetty=false
> * Changed the class hierarchy of ServerInterface to WorkerClientServer (WorkerClient and WorkerServer) to support a request/response protocol instead of just RPC
> * In netty, the messages/mutations are gathered by partition and send out as a partition's worth of messages/mutations
> * Added two new test classes (RequestTest.java and ConnectionTest.java) to test all requests and check netty connections.
> * PageRankBenchmark uses EdgeListVertex as a default
> 
> 
> This addresses bug GIRAPH-37.
>     https://issues.apache.org/jira/browse/GIRAPH-37
> 
> 
> Diffs
> -----
> 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/examples/SimpleShortestPathVertexTest.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestJsonBase64Format.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestManualCheckpoint.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphState.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexMutations.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WritableRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClientServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerCommunications.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerClient.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerInterface.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendVertexRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMutationsRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMutationsCache.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestServerHandler.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ResponseClientHandler.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestEncoder.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestDecoder.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyClient.java PRE-CREATION 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/pom.xml 1332888 
>   http://svn.apache.org/repos/asf/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java 1332888 
> 
> Diff: https://reviews.apache.org/r/5074/diff
> 
> 
> Testing
> -------
> 
> 'mvn verify' passes.  I ran several test runs to gather performance results.  Here is a simple example:
> 
> Hadoop RPC:
> hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=false -w 5 -V 5000000 -s 5 -e 2 -v
> 
> 12/05/09 01:59:56 INFO mapred.JobClient:   Giraph Timers
> 12/05/09 01:59:56 INFO mapred.JobClient:     Total (milliseconds)=167722
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 3 (milliseconds)=24775
> 12/05/09 01:59:56 INFO mapred.JobClient:     Setup (milliseconds)=2930
> 12/05/09 01:59:56 INFO mapred.JobClient:     Shutdown (milliseconds)=181
> 12/05/09 01:59:56 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=51025
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 0 (milliseconds)=21543
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 4 (milliseconds)=19858
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 5 (milliseconds)=2844
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 2 (milliseconds)=24507
> 12/05/09 01:59:56 INFO mapred.JobClient:     Superstep 1 (milliseconds)=20054
> 
> Netty:
> hadoop jar ~/giraph-0.2-SNAPSHOT-jar-with-dependencies.jar org.apache.giraph.benchmark.PageRankBenchmark -Dgiraph.useNetty=true -w 5 -V 5000000 -s 5 -e 2 -v
> 
> 12/05/09 02:06:10 INFO mapred.JobClient:   Giraph Timers
> 12/05/09 02:06:10 INFO mapred.JobClient:     Total (milliseconds)=57795
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 3 (milliseconds)=7636
> 12/05/09 02:06:10 INFO mapred.JobClient:     Setup (milliseconds)=3574
> 12/05/09 02:06:10 INFO mapred.JobClient:     Shutdown (milliseconds)=232
> 12/05/09 02:06:10 INFO mapred.JobClient:     Vertex input superstep (milliseconds)=13393
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 0 (milliseconds)=5610
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 4 (milliseconds)=8473
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 5 (milliseconds)=1844
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 2 (milliseconds)=7418
> 12/05/09 02:06:10 INFO mapred.JobClient:     Superstep 1 (milliseconds)=9612
> 
> These were some median runs. The overall runtime improved from 167722 -> 57795 with Netty (2.9x faster).  Loading the vertices improved from 51025 -> 13393 (3.8x faster).  More results coming tomorrow, but for bigger runs, the improvement is likely to be even more than 3x.
> 
> 
> Thanks,
> 
> Avery
> 
>