You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by Maja Kabiljo <ma...@fb.com> on 2012/08/01 20:43:10 UTC

Re: Review Request: Out-of-core messages

I've been running some benchmarking of this solution, I put in Excel
document in the attachment. There are some results of PageRankBenchmark
and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
'Messages 3' show the cases in which we run out of memory. Shortest Paths
algorithm uses messages very little when compared to the amount of other
data, so there I couldn't see any differences between solutions.
Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we are
very tight on memory so going out of core helps (I ran those a few times
since, but keep getting the same results). We can also see that execution
time is improved with just SimpleMessageStore, since in current
implementation we copy messages around when we store them in vertex.

I also tried running RandomMessagesBenchmark with really huge amount of
messages, but it crashed because message store didn't process messages
fast enough and worker got flooded with unprocessed requests. So in cases
like that the only thing which could help us would be to decrease the
speed of compute executions. But I think this is something that shouldn't
happen in real applications - this benchmark doesn't use received messages
at all, in a real application executions are going to be slower anyway if
they have to process that much data. Anyway, it would be good to have a
real problem which uses messages intensively and then we could see what's
really going on.

As a conclusion, to start with, maybe I can create a smaller patch from
this which only adds SimpleMessageStore, since as we can see keeping
messages outside of vertices helps. And then, once the RPC is removed, we
will be able to finally remove putMessages/getMessages/getNumMessages
functions from Vertex. For the out-of-core part, if we still offer the
option not to use it as default, I see no harm of adding it also, and as
you can see there are benefits in some cases.

Another thing, I think I should explain what from GIRAPH-45 discussion am
I actually using here, since I don't use bloomfilters and BTrees. The way
it works is the following:
- Inside the outer message store we have message stores for each of the
partitions separately.
- Partition message stores keep data in ordered map (ordered by vertex id).
- In outer messages store we check if we should flush something (do we
have more than allowed number of messages in memory). While we do, we
flush the partition with largest number of messages in memory.
- When partition messages store is flushed, all the data is written to a
file in the order of vertex ids, file content is like:
num_vertices
vertex_1_id num_messages_1 message_1_1 message_1_2 ...
vertex_2_id num_messages_2 message_2_1 message_2_2 ...
...
- In the end each partition will have some messages in memory, and N
files, where N is the number of times it was flushed.
- When it's time to do the computation, within a single partition we call
compute methods in order of vertex ids.
- We use buffered streams and read data from all partition files
sequentially, since we'll need data in the same order it's written in each
of the files. This way we limit number of random file accesses.

Maja




On 7/24/12 1:45 AM, "Avery Ching" <ac...@apache.org> wrote:

>We should integrate the partitioning of the graph into the input
>superstep to get locality as well.  We can use MapReduce to try and
>schedule the map tasks (workers) closest to its data and then make the
>workers smart enough to only try to load their partitions.
>
>On 7/22/12 4:30 PM, Claudio Martella wrote:
>> I see your reasoning. In general I'm being open to use MR when
>> necessary (e.g. i used to propose it instead of the automatic vertex
>> creation), here it could get tricky. I can see additional HDFS usage
>> as down (you have to be able to store 2x the graph). However, once the
>> graph is pre-filtered, this additional stage would not be necessary
>> again for the successive jobs (only when a different number of workers
>> is used). Though, it does add a not so small passage to the process.
>>
>> On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta <al...@fb.com>
>>wrote:
>>> Exactly. On paper, the amount of data around should be the same as
>>>during
>>> the computation, but in practice we do use a lot more memory.
>>> You can play with the settings and just push the problem a little
>>>farther
>>> away, by caching less and flushing requests more frequently, so then
>>>the
>>> bottleneck is on the servers.
>>> We're basically sending (k-1)/k of the graph through the network,
>>>where k
>>> is the number of workers.
>>>
>>> What I'm thinking is that in INPUT_SUPERSTEP we're doing what
>>>MapReduce is
>>> really good at (sorting and aggregating) in a probably inefficient (or
>>>at
>>> least non-scalable) way.
>>> We could try implementing it with a MapReduce job instead, where the
>>> mappers take input splits and emit (partition_id, vertex) (they would
>>>have
>>> access to the partitioner) and reducers just output the built
>>>partitions
>>> to HDFS.
>>> The computation stage would then be the usual Giraph job, where each
>>> worker knows where to get its partitions from HDFS.
>>> I can try making this change and see how it goes. It would just be one
>>>MR
>>> job, so we're not selling our souls to iterative MR.
>>>
>>> I can also see many cases where one might not want to shuffle vertices
>>> around at all: each worker reads a roughly equal part of the input
>>>(forget
>>> about bigger vertices for now) and simply communicates its own vertex
>>>ids
>>> to the master. Partition "a posteriori" instead of "a priori".
>>>
>>> What do you think?
>>>
>>> On 7/20/12 9:42 PM, "Eli Reisman" <in...@gmail.com> wrote:
>>>
>>>> What we are seeing in the metrics is the three-way load of
>>>>
>>>> 1. reading InputSplits from HDFS (mostly over the wire as there is no
>>>> locality right now)
>>>> 2. creating temporary collections of vertices, sending them on netty
>>>> 3. simultaneously receiving collections of vertices on netty from
>>>>remote
>>>> nodes that will be place in the local workers' partitions for
>>>>processing
>>>> stages
>>
>>


Re: Review Request: Out-of-core messages

Posted by Maja Kabiljo <ma...@fb.com>.
I've been investigating more the case in which we run out of memory even
if we use out-of-core messages, and here is what I've discovered. I count
how many messages have the worker sent out by increasing the number in the
moment of SendPartitionMessagesRequest.write(), and also count how many
messages have it received in the moment of
SendPartitionMessagesRequest.readFields(). Even on the smaller examples,
we can see that those two numbers differ significantly during the super
step (and in the end of super step they are about the same of course).

In one of the examples: I run RandomMessageBenchmark with 50k vertices,
100 edges per vertex, 100 messages per edge, message size 0.5KB, 2
workers, 16GB per worker. Before the crash, I can see that the number of
messages to be sent is around 28M, and the number of received messages
only 8M. I don't know the details of how Netty is implemented, but I
suppose that it has to keep sent messages until the moment it receives the
confirmation from the destination, and that's the reason why we run out of
memory. I was able to fix this by adding occasional
nettyClient.waitAllRequests() call, and got the problem of this size to
finish successfully! Adding these calls does slow down the algorithm a
bit, so I'll create a patch which adds this as an option, and not using it
as default. After that, by tweaking the out-of-core and these parameters,
we should be able to run jobs with any amount of messages data.

On 8/4/12 12:16 AM, "Eli Reisman" <in...@gmail.com> wrote:

>I like the idea of keeping the messages out of the vertices there is a lot
>of unneeded data copying/GC going on and if this eliminates some that
>would
>be fantastic and I think a big help through the whole job run, memory
>wise.
>
>On Fri, Aug 3, 2012 at 4:03 AM, Gianmarco De Francisci Morales <
>gdfm@apache.org> wrote:
>
>> Hi,
>>
>> >Are you saying that out-of-core is faster that hitting memory
>>boundaries
>> > >(i.e. GC)?  It is a bit tough to imagine that out-of-core beats
>>in-core
>> > >=).
>> >
>> > That's the only explanation I could think of, honestly it sounds
>>wrong to
>> > me too. But those are the results I keep getting. If someone has a
>>better
>> > one I'd love to hear it :-)
>>
>>
>> I am not surprised.
>> Streaming sequentially from a disk is faster than random reading from
>> memory [1].
>> Add the GC overhead, and you get an explanation for your results.
>>
>> [1] The Pathologies of Big Data,
>> http://queue.acm.org/detail.cfm?id=1563874
>>
>> Cheers,
>> --
>> Gianmarco
>>


Re: Review Request: Out-of-core messages

Posted by Eli Reisman <in...@gmail.com>.
I like the idea of keeping the messages out of the vertices there is a lot
of unneeded data copying/GC going on and if this eliminates some that would
be fantastic and I think a big help through the whole job run, memory wise.

On Fri, Aug 3, 2012 at 4:03 AM, Gianmarco De Francisci Morales <
gdfm@apache.org> wrote:

> Hi,
>
> >Are you saying that out-of-core is faster that hitting memory boundaries
> > >(i.e. GC)?  It is a bit tough to imagine that out-of-core beats in-core
> > >=).
> >
> > That's the only explanation I could think of, honestly it sounds wrong to
> > me too. But those are the results I keep getting. If someone has a better
> > one I'd love to hear it :-)
>
>
> I am not surprised.
> Streaming sequentially from a disk is faster than random reading from
> memory [1].
> Add the GC overhead, and you get an explanation for your results.
>
> [1] The Pathologies of Big Data,
> http://queue.acm.org/detail.cfm?id=1563874
>
> Cheers,
> --
> Gianmarco
>

Re: Review Request: Out-of-core messages

Posted by Gianmarco De Francisci Morales <gd...@apache.org>.
Hi,

>Are you saying that out-of-core is faster that hitting memory boundaries
> >(i.e. GC)?  It is a bit tough to imagine that out-of-core beats in-core
> >=).
>
> That's the only explanation I could think of, honestly it sounds wrong to
> me too. But those are the results I keep getting. If someone has a better
> one I'd love to hear it :-)


I am not surprised.
Streaming sequentially from a disk is faster than random reading from
memory [1].
Add the GC overhead, and you get an explanation for your results.

[1] The Pathologies of Big Data, http://queue.acm.org/detail.cfm?id=1563874

Cheers,
--
Gianmarco

Re: Review Request: Out-of-core messages

Posted by Maja Kabiljo <ma...@fb.com>.
Avery, thank you for your comments.

On 8/2/12 8:58 AM, "Avery Ching" <av...@gmail.com> wrote:

>Hi Maja,
>
>Thanks for publishing your results!  Really nice performance
>improvement.  I have some questions/comments inline.
>
>On 8/1/12 11:43 AM, Maja Kabiljo wrote:
>> I've been running some benchmarking of this solution, I put in Excel
>> document in the attachment. There are some results of PageRankBenchmark
>> and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
>> 'Messages 3' show the cases in which we run out of memory. Shortest
>>Paths
>> algorithm uses messages very little when compared to the amount of other
>> data, so there I couldn't see any differences between solutions.
>> Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we
>>are
>> very tight on memory so going out of core helps (I ran those a few times
>> since, but keep getting the same results).
>Are you saying that out-of-core is faster that hitting memory boundaries
>(i.e. GC)?  It is a bit tough to imagine that out-of-core beats in-core
>=).

That's the only explanation I could think of, honestly it sounds wrong to
me too. But those are the results I keep getting. If someone has a better
one I'd love to hear it :-)

>
>> We can also see that execution
>> time is improved with just SimpleMessageStore, since in current
>> implementation we copy messages around when we store them in vertex.
>So the performance difference can be explained by reducing memory copies?

That's correct.

>
>> I also tried running RandomMessagesBenchmark with really huge amount of
>> messages, but it crashed because message store didn't process messages
>> fast enough and worker got flooded with unprocessed requests. So in
>>cases
>> like that the only thing which could help us would be to decrease the
>> speed of compute executions. But I think this is something that
>>shouldn't
>> happen in real applications - this benchmark doesn't use received
>>messages
>> at all, in a real application executions are going to be slower anyway
>>if
>> they have to process that much data. Anyway, it would be good to have a
>> real problem which uses messages intensively and then we could see
>>what's
>> really going on.
>A question here:  Could we have set the max messages to a lower value to
>prevent the crashing?  What error did you actually see in this case?

I saw that message store keeps the number of messages in memory below the
limit, and also there are not many infrastructure objects created by it.
But we run out of memory anyways. So my guess is that data arrives to the
server but is not processed yet. I'll try to investigate more. Lowering
the max messages wouldn't help, since I even tried it with examples like:
max 1m messages of size 50B (~50MB total).

>
>> As a conclusion, to start with, maybe I can create a smaller patch from
>> this which only adds SimpleMessageStore, since as we can see keeping
>> messages outside of vertices helps. And then, once the RPC is removed,
>>we
>> will be able to finally remove putMessages/getMessages/getNumMessages
>> functions from Vertex.
>I think some folks are really going to like that.  It can allow them to
>directly implement MutableVertex (I think).
>
>> For the out-of-core part, if we still offer the
>> option not to use it as default, I see no harm of adding it also, and as
>> you can see there are benefits in some cases.
>I don't see any harm here at all.
>
>> Another thing, I think I should explain what from GIRAPH-45 discussion
>>am
>> I actually using here, since I don't use bloomfilters and BTrees. The
>>way
>> it works is the following:
>> - Inside the outer message store we have message stores for each of the
>> partitions separately.
>> - Partition message stores keep data in ordered map (ordered by vertex
>>id).
>> - In outer messages store we check if we should flush something (do we
>> have more than allowed number of messages in memory). While we do, we
>> flush the partition with largest number of messages in memory.
>> - When partition messages store is flushed, all the data is written to a
>> file in the order of vertex ids, file content is like:
>> num_vertices
>> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
>> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
>> ...
>> - In the end each partition will have some messages in memory, and N
>> files, where N is the number of times it was flushed.
>> - When it's time to do the computation, within a single partition we
>>call
>> compute methods in order of vertex ids.
>> - We use buffered streams and read data from all partition files
>> sequentially, since we'll need data in the same order it's written in
>>each
>> of the files. This way we limit number of random file accesses.
>>
>> Maja
>>
>>
>>
>>
>> On 7/24/12 1:45 AM, "Avery Ching" <ac...@apache.org> wrote:
>>
>>> We should integrate the partitioning of the graph into the input
>>> superstep to get locality as well.  We can use MapReduce to try and
>>> schedule the map tasks (workers) closest to its data and then make the
>>> workers smart enough to only try to load their partitions.
>>>
>>> On 7/22/12 4:30 PM, Claudio Martella wrote:
>>>> I see your reasoning. In general I'm being open to use MR when
>>>> necessary (e.g. i used to propose it instead of the automatic vertex
>>>> creation), here it could get tricky. I can see additional HDFS usage
>>>> as down (you have to be able to store 2x the graph). However, once the
>>>> graph is pre-filtered, this additional stage would not be necessary
>>>> again for the successive jobs (only when a different number of workers
>>>> is used). Though, it does add a not so small passage to the process.
>>>>
>>>> On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta
>>>><al...@fb.com>
>>>> wrote:
>>>>> Exactly. On paper, the amount of data around should be the same as
>>>>> during
>>>>> the computation, but in practice we do use a lot more memory.
>>>>> You can play with the settings and just push the problem a little
>>>>> farther
>>>>> away, by caching less and flushing requests more frequently, so then
>>>>> the
>>>>> bottleneck is on the servers.
>>>>> We're basically sending (k-1)/k of the graph through the network,
>>>>> where k
>>>>> is the number of workers.
>>>>>
>>>>> What I'm thinking is that in INPUT_SUPERSTEP we're doing what
>>>>> MapReduce is
>>>>> really good at (sorting and aggregating) in a probably inefficient
>>>>>(or
>>>>> at
>>>>> least non-scalable) way.
>>>>> We could try implementing it with a MapReduce job instead, where the
>>>>> mappers take input splits and emit (partition_id, vertex) (they would
>>>>> have
>>>>> access to the partitioner) and reducers just output the built
>>>>> partitions
>>>>> to HDFS.
>>>>> The computation stage would then be the usual Giraph job, where each
>>>>> worker knows where to get its partitions from HDFS.
>>>>> I can try making this change and see how it goes. It would just be
>>>>>one
>>>>> MR
>>>>> job, so we're not selling our souls to iterative MR.
>>>>>
>>>>> I can also see many cases where one might not want to shuffle
>>>>>vertices
>>>>> around at all: each worker reads a roughly equal part of the input
>>>>> (forget
>>>>> about bigger vertices for now) and simply communicates its own vertex
>>>>> ids
>>>>> to the master. Partition "a posteriori" instead of "a priori".
>>>>>
>>>>> What do you think?
>>>>>
>>>>> On 7/20/12 9:42 PM, "Eli Reisman" <in...@gmail.com> wrote:
>>>>>
>>>>>> What we are seeing in the metrics is the three-way load of
>>>>>>
>>>>>> 1. reading InputSplits from HDFS (mostly over the wire as there is
>>>>>>no
>>>>>> locality right now)
>>>>>> 2. creating temporary collections of vertices, sending them on netty
>>>>>> 3. simultaneously receiving collections of vertices on netty from
>>>>>> remote
>>>>>> nodes that will be place in the local workers' partitions for
>>>>>> processing
>>>>>> stages
>>>>


Re: Review Request: Out-of-core messages

Posted by Avery Ching <av...@gmail.com>.
Hi Maja,

Thanks for publishing your results!  Really nice performance 
improvement.  I have some questions/comments inline.

On 8/1/12 11:43 AM, Maja Kabiljo wrote:
> I've been running some benchmarking of this solution, I put in Excel
> document in the attachment. There are some results of PageRankBenchmark
> and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
> 'Messages 3' show the cases in which we run out of memory. Shortest Paths
> algorithm uses messages very little when compared to the amount of other
> data, so there I couldn't see any differences between solutions.
> Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we are
> very tight on memory so going out of core helps (I ran those a few times
> since, but keep getting the same results).
Are you saying that out-of-core is faster that hitting memory boundaries 
(i.e. GC)?  It is a bit tough to imagine that out-of-core beats in-core =).

> We can also see that execution
> time is improved with just SimpleMessageStore, since in current
> implementation we copy messages around when we store them in vertex.
So the performance difference can be explained by reducing memory copies?

> I also tried running RandomMessagesBenchmark with really huge amount of
> messages, but it crashed because message store didn't process messages
> fast enough and worker got flooded with unprocessed requests. So in cases
> like that the only thing which could help us would be to decrease the
> speed of compute executions. But I think this is something that shouldn't
> happen in real applications - this benchmark doesn't use received messages
> at all, in a real application executions are going to be slower anyway if
> they have to process that much data. Anyway, it would be good to have a
> real problem which uses messages intensively and then we could see what's
> really going on.
A question here:  Could we have set the max messages to a lower value to 
prevent the crashing?  What error did you actually see in this case?

> As a conclusion, to start with, maybe I can create a smaller patch from
> this which only adds SimpleMessageStore, since as we can see keeping
> messages outside of vertices helps. And then, once the RPC is removed, we
> will be able to finally remove putMessages/getMessages/getNumMessages
> functions from Vertex.
I think some folks are really going to like that.  It can allow them to 
directly implement MutableVertex (I think).

> For the out-of-core part, if we still offer the
> option not to use it as default, I see no harm of adding it also, and as
> you can see there are benefits in some cases.
I don't see any harm here at all.

> Another thing, I think I should explain what from GIRAPH-45 discussion am
> I actually using here, since I don't use bloomfilters and BTrees. The way
> it works is the following:
> - Inside the outer message store we have message stores for each of the
> partitions separately.
> - Partition message stores keep data in ordered map (ordered by vertex id).
> - In outer messages store we check if we should flush something (do we
> have more than allowed number of messages in memory). While we do, we
> flush the partition with largest number of messages in memory.
> - When partition messages store is flushed, all the data is written to a
> file in the order of vertex ids, file content is like:
> num_vertices
> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
> ...
> - In the end each partition will have some messages in memory, and N
> files, where N is the number of times it was flushed.
> - When it's time to do the computation, within a single partition we call
> compute methods in order of vertex ids.
> - We use buffered streams and read data from all partition files
> sequentially, since we'll need data in the same order it's written in each
> of the files. This way we limit number of random file accesses.
>
> Maja
>
>
>
>
> On 7/24/12 1:45 AM, "Avery Ching" <ac...@apache.org> wrote:
>
>> We should integrate the partitioning of the graph into the input
>> superstep to get locality as well.  We can use MapReduce to try and
>> schedule the map tasks (workers) closest to its data and then make the
>> workers smart enough to only try to load their partitions.
>>
>> On 7/22/12 4:30 PM, Claudio Martella wrote:
>>> I see your reasoning. In general I'm being open to use MR when
>>> necessary (e.g. i used to propose it instead of the automatic vertex
>>> creation), here it could get tricky. I can see additional HDFS usage
>>> as down (you have to be able to store 2x the graph). However, once the
>>> graph is pre-filtered, this additional stage would not be necessary
>>> again for the successive jobs (only when a different number of workers
>>> is used). Though, it does add a not so small passage to the process.
>>>
>>> On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta <al...@fb.com>
>>> wrote:
>>>> Exactly. On paper, the amount of data around should be the same as
>>>> during
>>>> the computation, but in practice we do use a lot more memory.
>>>> You can play with the settings and just push the problem a little
>>>> farther
>>>> away, by caching less and flushing requests more frequently, so then
>>>> the
>>>> bottleneck is on the servers.
>>>> We're basically sending (k-1)/k of the graph through the network,
>>>> where k
>>>> is the number of workers.
>>>>
>>>> What I'm thinking is that in INPUT_SUPERSTEP we're doing what
>>>> MapReduce is
>>>> really good at (sorting and aggregating) in a probably inefficient (or
>>>> at
>>>> least non-scalable) way.
>>>> We could try implementing it with a MapReduce job instead, where the
>>>> mappers take input splits and emit (partition_id, vertex) (they would
>>>> have
>>>> access to the partitioner) and reducers just output the built
>>>> partitions
>>>> to HDFS.
>>>> The computation stage would then be the usual Giraph job, where each
>>>> worker knows where to get its partitions from HDFS.
>>>> I can try making this change and see how it goes. It would just be one
>>>> MR
>>>> job, so we're not selling our souls to iterative MR.
>>>>
>>>> I can also see many cases where one might not want to shuffle vertices
>>>> around at all: each worker reads a roughly equal part of the input
>>>> (forget
>>>> about bigger vertices for now) and simply communicates its own vertex
>>>> ids
>>>> to the master. Partition "a posteriori" instead of "a priori".
>>>>
>>>> What do you think?
>>>>
>>>> On 7/20/12 9:42 PM, "Eli Reisman" <in...@gmail.com> wrote:
>>>>
>>>>> What we are seeing in the metrics is the three-way load of
>>>>>
>>>>> 1. reading InputSplits from HDFS (mostly over the wire as there is no
>>>>> locality right now)
>>>>> 2. creating temporary collections of vertices, sending them on netty
>>>>> 3. simultaneously receiving collections of vertices on netty from
>>>>> remote
>>>>> nodes that will be place in the local workers' partitions for
>>>>> processing
>>>>> stages
>>>


Re: Review Request: Out-of-core messages

Posted by Maja Kabiljo <ma...@fb.com>.
Hi Claudio,

It's fine, I wasn't trying to take credits for the things you and others
came up with, I'm sorry if it sounded that way to you. Since there were a
lot of great ideas in the discussion, I just wanted to make clear which
things I am using and which not, so one wouldn't have to go through the
whole code to find that out.

Maja

On 8/24/12 7:23 PM, "Claudio Martella" <cl...@gmail.com> wrote:

>Hi Maja,
>
>I didn't want to minimize your effort which is great, I just wanted to
>underline the complete overlap between the previous plan and your
>work. The designed had moved from BTrees+Bloomfilters towards sorted
>partitions, as in the same discussion (17/Dec/11 15:28) and as
>presented at the Berlin workshop on the 6th of june
>(http://prezi.com/ecdgiav4oeex/bb-apache-giraph-distributed-graph-processi
>ng-in-the-cloud/).
>It really looks to me like your contribution is one of the best ways
>to tackle the problem.
>
>Best,
>Claudio
>
>On Wed, Aug 1, 2012 at 8:43 PM, Maja Kabiljo <ma...@fb.com> wrote:
>> Another thing, I think I should explain what from GIRAPH-45 discussion
>>am
>> I actually using here, since I don't use bloomfilters and BTrees. The
>>way
>> it works is the following:
>> - Inside the outer message store we have message stores for each of the
>> partitions separately.
>> - Partition message stores keep data in ordered map (ordered by vertex
>>id).
>> - In outer messages store we check if we should flush something (do we
>> have more than allowed number of messages in memory). While we do, we
>> flush the partition with largest number of messages in memory.
>> - When partition messages store is flushed, all the data is written to a
>> file in the order of vertex ids, file content is like:
>> num_vertices
>> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
>> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
>> ...
>> - In the end each partition will have some messages in memory, and N
>> files, where N is the number of times it was flushed.
>> - When it's time to do the computation, within a single partition we
>>call
>> compute methods in order of vertex ids.
>> - We use buffered streams and read data from all partition files
>> sequentially, since we'll need data in the same order it's written in
>>each
>> of the files. This way we limit number of random file accesses.
>
>
>
>-- 
>   Claudio Martella
>   claudio.martella@gmail.com


Re: Review Request: Out-of-core messages

Posted by Claudio Martella <cl...@gmail.com>.
Hi Maja,

I didn't want to minimize your effort which is great, I just wanted to
underline the complete overlap between the previous plan and your
work. The designed had moved from BTrees+Bloomfilters towards sorted
partitions, as in the same discussion (17/Dec/11 15:28) and as
presented at the Berlin workshop on the 6th of june
(http://prezi.com/ecdgiav4oeex/bb-apache-giraph-distributed-graph-processing-in-the-cloud/).
It really looks to me like your contribution is one of the best ways
to tackle the problem.

Best,
Claudio

On Wed, Aug 1, 2012 at 8:43 PM, Maja Kabiljo <ma...@fb.com> wrote:
> Another thing, I think I should explain what from GIRAPH-45 discussion am
> I actually using here, since I don't use bloomfilters and BTrees. The way
> it works is the following:
> - Inside the outer message store we have message stores for each of the
> partitions separately.
> - Partition message stores keep data in ordered map (ordered by vertex id).
> - In outer messages store we check if we should flush something (do we
> have more than allowed number of messages in memory). While we do, we
> flush the partition with largest number of messages in memory.
> - When partition messages store is flushed, all the data is written to a
> file in the order of vertex ids, file content is like:
> num_vertices
> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
> ...
> - In the end each partition will have some messages in memory, and N
> files, where N is the number of times it was flushed.
> - When it's time to do the computation, within a single partition we call
> compute methods in order of vertex ids.
> - We use buffered streams and read data from all partition files
> sequentially, since we'll need data in the same order it's written in each
> of the files. This way we limit number of random file accesses.



-- 
   Claudio Martella
   claudio.martella@gmail.com

Re: Review Request: Out-of-core messages

Posted by Eli Reisman <in...@gmail.com>.
I have been working on locality that does not involve changes to how we
interact with map reduce, but would be very interested in taking the more
"active" Hadoop-style approach Avery mentioned if its practical, since its
is already available in Hadoop and we already use mappers for our workers.

I was hoping to change the vertex IO formats and/or job submission code and
was puzzled why this doesn't already happen. Why was this Hadoop feature
not taken advantage of originally? Is there more to this change than it
looks like? Is there something we get out of the existing approach, or
something about locality that is a problem for Giraph as it is now?


On Wed, Aug 1, 2012 at 11:43 AM, Maja Kabiljo <ma...@fb.com> wrote:

> I've been running some benchmarking of this solution, I put in Excel
> document in the attachment. There are some results of PageRankBenchmark
> and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
> 'Messages 3' show the cases in which we run out of memory. Shortest Paths
> algorithm uses messages very little when compared to the amount of other
> data, so there I couldn't see any differences between solutions.
> Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we are
> very tight on memory so going out of core helps (I ran those a few times
> since, but keep getting the same results). We can also see that execution
> time is improved with just SimpleMessageStore, since in current
> implementation we copy messages around when we store them in vertex.
>
> I also tried running RandomMessagesBenchmark with really huge amount of
> messages, but it crashed because message store didn't process messages
> fast enough and worker got flooded with unprocessed requests. So in cases
> like that the only thing which could help us would be to decrease the
> speed of compute executions. But I think this is something that shouldn't
> happen in real applications - this benchmark doesn't use received messages
> at all, in a real application executions are going to be slower anyway if
> they have to process that much data. Anyway, it would be good to have a
> real problem which uses messages intensively and then we could see what's
> really going on.
>
> As a conclusion, to start with, maybe I can create a smaller patch from
> this which only adds SimpleMessageStore, since as we can see keeping
> messages outside of vertices helps. And then, once the RPC is removed, we
> will be able to finally remove putMessages/getMessages/getNumMessages
> functions from Vertex. For the out-of-core part, if we still offer the
> option not to use it as default, I see no harm of adding it also, and as
> you can see there are benefits in some cases.
>
> Another thing, I think I should explain what from GIRAPH-45 discussion am
> I actually using here, since I don't use bloomfilters and BTrees. The way
> it works is the following:
> - Inside the outer message store we have message stores for each of the
> partitions separately.
> - Partition message stores keep data in ordered map (ordered by vertex id).
> - In outer messages store we check if we should flush something (do we
> have more than allowed number of messages in memory). While we do, we
> flush the partition with largest number of messages in memory.
> - When partition messages store is flushed, all the data is written to a
> file in the order of vertex ids, file content is like:
> num_vertices
> vertex_1_id num_messages_1 message_1_1 message_1_2 ...
> vertex_2_id num_messages_2 message_2_1 message_2_2 ...
> ...
> - In the end each partition will have some messages in memory, and N
> files, where N is the number of times it was flushed.
> - When it's time to do the computation, within a single partition we call
> compute methods in order of vertex ids.
> - We use buffered streams and read data from all partition files
> sequentially, since we'll need data in the same order it's written in each
> of the files. This way we limit number of random file accesses.
>
> Maja
>
>
>
>
> On 7/24/12 1:45 AM, "Avery Ching" <ac...@apache.org> wrote:
>
> >We should integrate the partitioning of the graph into the input
> >superstep to get locality as well.  We can use MapReduce to try and
> >schedule the map tasks (workers) closest to its data and then make the
> >workers smart enough to only try to load their partitions.
> >
> >On 7/22/12 4:30 PM, Claudio Martella wrote:
> >> I see your reasoning. In general I'm being open to use MR when
> >> necessary (e.g. i used to propose it instead of the automatic vertex
> >> creation), here it could get tricky. I can see additional HDFS usage
> >> as down (you have to be able to store 2x the graph). However, once the
> >> graph is pre-filtered, this additional stage would not be necessary
> >> again for the successive jobs (only when a different number of workers
> >> is used). Though, it does add a not so small passage to the process.
> >>
> >> On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta <al...@fb.com>
> >>wrote:
> >>> Exactly. On paper, the amount of data around should be the same as
> >>>during
> >>> the computation, but in practice we do use a lot more memory.
> >>> You can play with the settings and just push the problem a little
> >>>farther
> >>> away, by caching less and flushing requests more frequently, so then
> >>>the
> >>> bottleneck is on the servers.
> >>> We're basically sending (k-1)/k of the graph through the network,
> >>>where k
> >>> is the number of workers.
> >>>
> >>> What I'm thinking is that in INPUT_SUPERSTEP we're doing what
> >>>MapReduce is
> >>> really good at (sorting and aggregating) in a probably inefficient (or
> >>>at
> >>> least non-scalable) way.
> >>> We could try implementing it with a MapReduce job instead, where the
> >>> mappers take input splits and emit (partition_id, vertex) (they would
> >>>have
> >>> access to the partitioner) and reducers just output the built
> >>>partitions
> >>> to HDFS.
> >>> The computation stage would then be the usual Giraph job, where each
> >>> worker knows where to get its partitions from HDFS.
> >>> I can try making this change and see how it goes. It would just be one
> >>>MR
> >>> job, so we're not selling our souls to iterative MR.
> >>>
> >>> I can also see many cases where one might not want to shuffle vertices
> >>> around at all: each worker reads a roughly equal part of the input
> >>>(forget
> >>> about bigger vertices for now) and simply communicates its own vertex
> >>>ids
> >>> to the master. Partition "a posteriori" instead of "a priori".
> >>>
> >>> What do you think?
> >>>
> >>> On 7/20/12 9:42 PM, "Eli Reisman" <in...@gmail.com> wrote:
> >>>
> >>>> What we are seeing in the metrics is the three-way load of
> >>>>
> >>>> 1. reading InputSplits from HDFS (mostly over the wire as there is no
> >>>> locality right now)
> >>>> 2. creating temporary collections of vertices, sending them on netty
> >>>> 3. simultaneously receiving collections of vertices on netty from
> >>>>remote
> >>>> nodes that will be place in the local workers' partitions for
> >>>>processing
> >>>> stages
> >>
> >>
>
>