You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by Maja Kabiljo <ma...@fb.com> on 2012/10/20 04:54:27 UTC

Review Request: Remove aggregator handling from Zookeeper

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7673/
-----------------------------------------------------------

Review request for giraph.


Description
-------

This patch follows some of the discussion on GIRAPH-273. Here is a brief description of how all aggregation works now:

- For each aggregator, we determine which worker owns it using the hash code of aggregator name.
- At the end of the superstep, worker first sends values its vertices aggregated to the owners of aggregators. (SendWorkerAggregatorsRequest)
- After receiving all these partial values and aggregating them together, worker will send final aggregated values of aggregators which it owns to the master. (SendAggregatorsToMasterRequest)
- Master will get all aggregated values, do master.compute, and later send aggregators to their owners. (SendAggregatorsToOwnerRequest)
- When worker receives its aggregators from master, it will distribute them further to all other workers. (SendAggregatorsToWorkerRequest)
- When worker receives aggregators from all workers, it's ready to proceed with the computation.

In order to avoid any additional barrier, workers keep counting the number of each of request types they have received, so they would know (independently from each other) when they can go to next superstep.

On master everything is kept in MasterAggregatorHandler, on worker we have three classes:
- WorkerAggregatorHandler is used by vertex.compute - it provides the values for getAggregatedValue, and has values to which we do aggregate.
- OwnerAggregatorServerData - here we keep aggregating partial aggregated values from other workers, for aggregators which we own.
- AllAggregatorServerData - this we use to receive aggregators from previous superstep from master and worker owners. 

I know it's a huge patch, but I'll really appreciate if someone finds time to take a look :-) Would love to hear your comments/suggestions.

Note: When there are no aggregators, or there are just a few small ones, on our cluster there was absolutely no time overhead with this change. That's why I didn't want to complicate it even more and have another implementation which still uses Zookeeper, or skips part of the described steps. Of course, if someone finds a need for it, it can be added later.

Another possible improvement is to have something like a dictionary for all aggregator classes which are used, and then we don't need to send the whole name of the aggregator class with each one of them. This impacts only the case when we have a lot of small aggregators, so again it can be added if the need arises.

Also one thing to improve in the future is to have local copies of aggregators per thread, so we could avoid synchronization there.


This addresses bug GIRAPH-273.
    https://issues.apache.org/jira/browse/GIRAPH-273


Diffs
-----

  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/ServerData.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorHandler.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java 1400335 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java 1400335 

Diff: https://reviews.apache.org/r/7673/diff/


Testing
-------

mvn clean verify, tests in pseudo-distributed mode.
AggregatorsBenchmark (which also checks for correctness) on various amount of aggregators and wokrers.
Tested on fb application which uses a lot of big aggregators, also tested it with multithreading.


Thanks,

Maja Kabiljo


Re: Review Request: Remove aggregator handling from Zookeeper

Posted by Maja Kabiljo <ma...@fb.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7673/
-----------------------------------------------------------

(Updated Oct. 25, 2012, 9:23 p.m.)


Review request for giraph.


Changes
-------

I'm uploading a new diff, after internal review. There are no important functionality changes, just some style and documentation improvements. Still passes all the testing.


Description
-------

This patch follows some of the discussion on GIRAPH-273. Here is a brief description of how all aggregation works now:

- For each aggregator, we determine which worker owns it using the hash code of aggregator name.
- At the end of the superstep, worker first sends values its vertices aggregated to the owners of aggregators. (SendWorkerAggregatorsRequest)
- After receiving all these partial values and aggregating them together, worker will send final aggregated values of aggregators which it owns to the master. (SendAggregatorsToMasterRequest)
- Master will get all aggregated values, do master.compute, and later send aggregators to their owners. (SendAggregatorsToOwnerRequest)
- When worker receives its aggregators from master, it will distribute them further to all other workers. (SendAggregatorsToWorkerRequest)
- When worker receives aggregators from all workers, it's ready to proceed with the computation.

In order to avoid any additional barrier, workers keep counting the number of each of request types they have received, so they would know (independently from each other) when they can go to next superstep.

On master everything is kept in MasterAggregatorHandler, on worker we have three classes:
- WorkerAggregatorHandler is used by vertex.compute - it provides the values for getAggregatedValue, and has values to which we do aggregate.
- OwnerAggregatorServerData - here we keep aggregating partial aggregated values from other workers, for aggregators which we own.
- AllAggregatorServerData - this we use to receive aggregators from previous superstep from master and worker owners. 

I know it's a huge patch, but I'll really appreciate if someone finds time to take a look :-) Would love to hear your comments/suggestions.

Note: When there are no aggregators, or there are just a few small ones, on our cluster there was absolutely no time overhead with this change. That's why I didn't want to complicate it even more and have another implementation which still uses Zookeeper, or skips part of the described steps. Of course, if someone finds a need for it, it can be added later.

Another possible improvement is to have something like a dictionary for all aggregator classes which are used, and then we don't need to send the whole name of the aggregator class with each one of them. This impacts only the case when we have a lot of small aggregators, so again it can be added if the need arises.

Also one thing to improve in the future is to have local copies of aggregators per thread, so we could avoid synchronization there.


This addresses bug GIRAPH-273.
    https://issues.apache.org/jira/browse/GIRAPH-273


Diffs (updated)
-----

  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/ServerData.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorHandler.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java 1402331 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java PRE-CREATION 
  http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java 1402331 

Diff: https://reviews.apache.org/r/7673/diff/


Testing
-------

mvn clean verify, tests in pseudo-distributed mode.
AggregatorsBenchmark (which also checks for correctness) on various amount of aggregators and wokrers.
Tested on fb application which uses a lot of big aggregators, also tested it with multithreading.


Thanks,

Maja Kabiljo