You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by "Eli Reisman (JIRA)" <ji...@apache.org> on 2012/09/07 01:29:09 UTC

[jira] [Created] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Eli Reisman created GIRAPH-322:
----------------------------------

             Summary: Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
                 Key: GIRAPH-322
                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
             Project: Giraph
          Issue Type: Improvement
          Components: bsp
    Affects Versions: 0.2.0
            Reporter: Eli Reisman
            Assignee: Eli Reisman
            Priority: Minor
             Fix For: 0.2.0


Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.

This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.

The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.

If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.

More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13454229#comment-13454229 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

No problem Hyunsik, this patch is undergoing change and not ready for action yet, if you're playing with testing the test.sh script, go right ahead!

Maja: Thats good to know about spill to disk, that is not what I saw before and I should definitely play with this some more, I must have set the params wrong. The thing about disk spill here is no one seems to bite on the idea of replacing any existing solutions with Giraph if spill to disk is involved, most folks here seem already convinced that a solution that spills to disk too much is not different enough from the existing solutions for a given use case to warrant changing the approach. The use cases for Giraph here revolve around scale out on many machines and running in-memory. I am hoping by getting your params set right and using it as an emergency buffer when memory is just too tight we can gradually loosen up this view and let users decide for themselves if its an option they want to exercise, but as it stands now I need to pursue solutions that are at least feasible in memory first. I am excited to see this might still be a viable option to make this work. Thanks for the advice!

As far as where the crash occurs, I'm saying the whole graph data I'm using loads no problem and can exist in memory. So far as I tested an earlier messaging solution and GIRAPH-314 with amortizing on different numbers of supersteps, the data in-memory was not where the errors were reported. They were coming on the Netty receiving side as all the messages were arriving at their destinations. When the messages arrive, the data (for the most part) is aggregating how many times a 2nd degree neighbor is encountered in incoming messages, so the data we aggregate at each vertex is not going to grow as fast as the accumulated size of the messages the vertex receives. Thats why the amortizing helped so much, if we can process each message we can GC it quickly and the local storage per vertex only grows in size when a new unique neighbor ID is encountered in an incoming message. 

So...we'll see what actually happens, this is an experiment after all, but so far so good. The volume on the network and messages sitting on the sender or receiver end before processing in a given superstep seem to represent the memory overload dangers. If we amortize message volume over supersteps and de-duplicate transmissions, this pressure is relieved. How much remains to be seen. I am still operating from a laptop and haven't been able to get set up to do testing yet so no word. But i think your fix was just the right thing to get past the error I was seeing, thanks!

Two things I want to try in another iteration on this patch right away:

1) Try to use the PartitionOwner to send one copy to each host worker and spread out duplicate references to partitions on the remote owner rather than grouping them early as we spoke about. In this way perhaps even when a use case does not combine this with -Dhash.userPartitionCount we can at least deduplicate volume of messages on the wire to the same host all the time. If this goes well, it will not be hard to throw up another JIRA and convert SendMessageCache similarly.

2) make sure the solution is robust for non-idempotent messaging by removing the M -> Set<I> in favor of a M -> List<I> model to retain duplicate deliveries of the same message to the same vertex. In cases like PageRank where the same DoubleWritable might get sent to the same vertex from neighbors on the same sending worker from multiple partitions on that worker, this message has to be seen as a "fresh delivery" of that value as the receiver iterates on its message for each sender that "sent" a copy originally.

3) try it out, see what happens!

Thanks for all the advice, I really appreciate your input. I'll have another patch up soon to see if we're heading in the right direction with this...




                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13453196#comment-13453196 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

Thanks Maja, great advice. I will check into all these things. I know exactly what you mean about the spill to disk, it will require some thought to integrate. That has been the problem with this whole approach: Giraph is hard-wired to Partition -> Vertex -> List<M> and broadcast-friendly data structures really go against the grain.

The one issue that also makes spills to disk a tricky option is that once spilled to disk each M object is written over and over (not a referenceId or something), and when read back they are "fresh instances" again not ref's to the same object. So we get a bunch of re-duplication on read from disk. Eliminating this problem will require some thought and overhauls.

Our data already fits in memory well, its just a matter of getting the messaging out of a N^2 growth situation and into a (N * a constant) area, where the amortizing can do the rest of the job. The nice thing about the amortizing (its an ugly solution I know) is that when only some workers are sending on any given superstep, the steps pass extremely quickly. once something works well, I will attempt to generalize the solution. Forcing the amortizing responsibility out on the application users is not ideal, even if SimpleTriangleClosingVertex provides an example to copy.

Thanks for the tip about the constructor, the one from the original class (if I remember right) was just empty braces so I left it to default. Nice save!

So you're saying if I set the command line opts correctly, the Vertex#compute() cycle will actually grind to a halt on each worker while the message system copies to/from disk whenever messages pile up? if so, this is worth playing with for sure, i did not see that behavior when I was trying to run the disk spill options, amybe I just set it up wrong? without the compute cycle stopping during spill reads/writes, the messages just pile up in memory instead of on the network. Either way the re-duplication on disk read is going to keep that from being a one-stop solution for us.

As far as 1 partition per worker, yes its funny I considered an alternate JIRA when i was coding this to address the SendMessageCache issue you mentioned. Since the reason for lots of small partitions per worker was to evenly redistribute them on worker crash, it hasn't really hurt us to use 1 per worker for this purpose since it drastically reduces messages required per worker, and there is no re-distribution of partitions on worker crash, crash just ends the job run for Giraph in its current form. Anyway a fix for both message caches in this regard is a great idea, I will take a 2nd look at this.

As for how I think the folks on this end want the final solution to work, we're working toward "spill to disk only when there's no other choice" so if it is a part of the solution it should ideally be there just as an emergency buffer against overload that kicks in once in a while. But just getting it to work at the scale we want is my main goal, we can tune the use case after that!

OK, off to try this stuff...thanks so much for your thoughtful input!
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Maja Kabiljo (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13454723#comment-13454723 ] 

Maja Kabiljo commented on GIRAPH-322:
-------------------------------------

I agree it's always better if you can do things in-core, I'm asking about that part mainly because you mentioned trying it and that it didn't work, which is not what I expected to happen.

For 4), please note that WorkerInfo.partitionId are not actually Giraph partition ids, but rather the values of mapred.task.partition, i.e. they are actually per worker, not per our partition. I've noticed this while implementing distributed aggregators, and I agree that naming is very bad, we should change it to something different to avoid confusion.

I'm moving part of the discussion back to GIRAPH-314.

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-322:
-------------------------------

    Attachment: GIRAPH-322-5.patch

Brings this into harmony with the approach of GIRAPH-328. Passes mvn verify etc. but there's more loose ends to tie up to make this a robust solution. I'm going to concentrate on getting GIRAPH-328 done, then continue working on this.

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch, GIRAPH-322-5.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-322:
-------------------------------

    Attachment: GIRAPH-322-4.patch

This is some tweaks and improvements. I tried several ways to remove the "duplication-per-partition" on the sender side, and learned this:

1) it can totally be done, and would deduplicate a lot of messages for all code paths from Vertex#sendMessage etc.

2) it touches more code than I feel comfortable including in this JIRA when it should really be a separate JIRA and we should do sendMessage() and sendMessageToAllEdges() at the same time.

3) I can test GIRAPH-322 just fine using "-Dhash.userPartitionCount==# of workers" to see what comes of this, and get this commited as its own fix, rolling the partition deduplicating in the code to the other JIRA mentioned in #2. This idea can then be judged on its own merits (or not)

4) For future reference, the JIRA mentioned in #2 would require WorkerInfo/PartitionOwner type plumbing to be per-worker instances and not per-partition anymore, and would require the netty request ack's like ClientRequestId to use the host-port combo for that worker as a "destinationWorkerId" rather than the WorkerInfo's partitionId. thats about it. This would be a good JIRA, a real win I think.

So, here's a version that should bear some testing. I'm still on a laptop but when i get to set my Giraph rig up again at home I will definitely begin doing this. More soon...


                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13471912#comment-13471912 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

I'm going to re-think and redo/rebase this patch given when has evolved from GIRAPH-328. Stay tuned...

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch, GIRAPH-322-5.patch, GIRAPH-322-6.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-322:
-------------------------------

    Attachment: GIRAPH-322-1.patch

This is an "end to end solution" but the glue is ugly and I need to test it so although it passes mvn verify etc. it might not work at all. I plan to tune, adjust, bug fix, and resubmit a few times before having anything useful. we're still at the experimental stage here for sure.

Suggestions welcome!

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13486431#comment-13486431 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

I am waiting to see how the message passing plumbing shapes up so I don't get in the way of that, then we can deal with ideas around this if they need to be dealt with at all at that point. The purpose of 322 is message deduplication when we know it can be done (as in broadcast messaging from the vertex API) so if there's a need, I'll come back to this.

If this is one of the JIRA issues holding up the new release, we can resolve it won't fix and I'll post a fresh JIRA someday.

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch, GIRAPH-322-5.patch, GIRAPH-322-6.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Maja Kabiljo (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13453083#comment-13453083 ] 

Maja Kabiljo commented on GIRAPH-322:
-------------------------------------

You are missing the default constructor for SendBroadcastMessageRequest. If you are testing it with smaller example it can be the cause of the problem you described, since flush will be the first moment when messages get sent and processed. Though you should be able to see the exception if this is the only problem. 

Great work, I'm looking forward to hear how it performs.

This is not a big deal, but when sendBroadcastMessageRequest is called you have all destinations grouped together, and then you add it one by one to the cache where they are grouped together again. 

Revisiting out-of-core to work better with this solution is not going to be straightforward, since we still want to keep the messages for one vertex grouped together in order to minimize random accesses. I'll think a bit about it and share anything I came up with.

One thing that doesn't seem good is that you are setting the number of partitions to 1 per worker. I think Avery already suggested something about modifying SendMessageCache to keep and send messages per worker, not per partition. So something like that could also be done with this solution.

Was your plan to use out-of-core graph in the end also, or that data would fit in memory?

I'm really interested in hearing whether you make it work with out-of-core messages. I agree with what you said for benchmarks vs real use cases, but for out-of-core messages I don't see how it can get worse than RandomMessageBenchmark since one can't generate messages at higher rate than there :-) Maybe memory is leaking elsewhere, one thing you can try is set number of in-core messages and open requests to something really low, it will be extremely slow but just to see whether it will crash.
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-322:
-------------------------------

    Attachment: GIRAPH-322-6.patch

Needed to change a couple details to allow me to fix them a better way in GIRAPH-328, will continue this patch when that one is squared away. Passes mvn verify, etc.
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch, GIRAPH-322-5.patch, GIRAPH-322-6.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Maja Kabiljo (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13453941#comment-13453941 ] 

Maja Kabiljo commented on GIRAPH-322:
-------------------------------------

So it works now? Please share results from the runs when you get them.

For limiting the number of open requests, it works like this: whenever we send a request we check how many requests we have for which we haven't received reply yet - if that number is above the limit we'll just wait there. So yes, vertex.compute execution will be paused. Since we send reply only after a request is processed, this way we are also limiting the number of unprocessed requests on the receiving side. To use this option you need to set the following two parameters:
giraph.waitForRequestsConfirmation=true
giraph.maxNumberOfOpenRequests= your_limit

I agree with everything you said about making this solution work with out-of-core messaging. With the current way there is duplication, and just keeping references would hurt performance too much because of random accesses. I think we'll have to figure out some clever solution in between. But let's first see how this works with in-core stuff, and we can get to out-of-core later.

Now kind of unrelated to this patch, but more to GIRAPH-314, and your problem size. So you are saying that you are able to keep in memory all the maps from second degree neighbours to number of paths to them. And you were also able to transfer all the messages before this solution, just by using amortization. So the problem at the first place was not transferring all the messages, it's keeping all unprocessed messages in the memory at the same time? Or am I getting this wrong? If you fit above mentioned maps in memory, I guess the number of edges per worker is not big? This patch is something very useful for a certain kind of problems: when the messages are big objects and we don't have a smart combiner and when we have a lot of neighbours per vertex - much more than the number of workers (unless you have some smart partitioning strategy). Just trying to get the sense why this is the best approach for your particular problem, not discussing the general idea here.
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13455061#comment-13455061 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

The WorkerInfo is in reference to the fact that many are referenced needlessly inside PartitionOwners in the messaging code to match everyone up with a Partition ID. One of the ways that worked well in my experiments with this yesterday was to organize the messaging data structures around WorkerInfos instead of PartitionOwners and use those to group related messages that could be sent in a bundle together. The ClientRequestId would need to use a host/port hash from WorkerInfo or something in place of a partitionId for sequencing but thats a small change. This wouldn't be an amazing fix, but it would give us a finer-grained control over how we bundle groups of messages between nodes and avoid extra data structures to route the messages to partitions from the sending side. Anyway I'm thinking I shoudl put up a JIRA for this and try it as a separate issue.

When the code died using my attempt at spill to disk, it died right at the beginning of the run as soon as the graph data was loaded and the messaging started, same as without it. My explanation from the previous comment is how the amortizing code died, sorry. Anyway from your explanation it sounds like I did not have it set right if it was doing that. I am excited to try it again.

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch, GIRAPH-322-4.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Giraph QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13454079#comment-13454079 ] 

Giraph QA commented on GIRAPH-322:
----------------------------------

The above pre-commit message is triggered manually by me. Few days ago, I requested the pre-commit hook to builds@a.o, but the request has not been handled yet.
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-322:
-------------------------------

    Attachment: GIRAPH-322-2.patch

Forgot some generics in a declaration. So far in testing this seems to work great, but at the waitAllRequests() barrier at the end of the first messaging step, workers who have just flushed their last messages out don't leave the barrier. I think I have a bug in my Request wiring. 

If anyone takes a peek and notices problems and obvious bugs or typos, please let me know, I need to get this running to completion (or crash) to make sure its a good idea at all! So far what I have seen is just was I was hoping for....but now it has to get past the first superstep ;)

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13451370#comment-13451370 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

When I run the instrumented copy, everything runs great and messages get where they are going until the very last flush of the leftovers at the end of super step 0 where we hit NettyWorkerClient#waitAllRequests. In here, I see a continual loop on all the flushing workers where they endlessly try to re-connect with their destinations. I am not seeing anything to indicate the destinations have crashed is the weird thing. I am very suspicious my request wiring is not quite right. Will take a deeper look in the next day or two. If anyone sees anything obvious, please let me know.

I will also attempt to try out the disk spill and tune it better, but I will be operating on a smaller setup now so I will need good results confirmed in the end by those better resourced than myself. ;) but thats a ways off still...

                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-322:
-------------------------------

    Attachment: GIRAPH-322-3.patch

Thanks Maja, I see now the reflection constructor is probably the issue, good catch! I agree about the message cache and partition issues, I think there's a big benefit to sending messages "one per worker" and then figuring out which partition for sub-delivery. I was reluctant to change that in this patch since I am new to the messaging code, and since the HashPartitioner provides the CLI option I figured I should try it out that way and explore other solutions in another JIRA or after Avery says it OK to hack the existing solution ;)

This is also why the message/destination data gets split up after being grouped as you mentioned. It always has to be grouped by partition (and message dupes sent individually per partition) because all messages on both sides are organized around partitions. This is good if they get swapped in the middle of a superstep, or if a worker died and we had a restart story, but for this use case it does add some extra work and memory for no gain. As long as the partitioning is set to 1 per worker we end up doing some extra work but not sending extra messages, which is good enough for now.

We should bounce around some ideas about this though, its not an ideal situation.




                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13454074#comment-13454074 ] 

Hadoop QA commented on GIRAPH-322:
----------------------------------

-1 overall.  Here are the results of testing the latest attachment 
  http://issues.apache.org/jira/secure/attachment/12544698/GIRAPH-322-3.patch
  against trunk revision 1383115.

    +1 @author.  The patch does not contain any @author tags.

    -1 tests included.  The patch doesn't appear to include any new or modified tests.
                        Please justify why no new tests are needed for this patch.
                        Also please list what manual steps were performed to verify this patch.

    +1 javac.  The applied patch does not increase the total number of javac compiler warnings.

    +1 javadoc.  The applied patch does not increase the total number of javadoc warnings.

    +1 checkstyle.  The patch generated 0 code style errors.

    +1 findbugs.  The patch does not introduce any new Findbugs (version 1.3.9) warnings.

    +1 release audit.  The applied patch does not increase the total number of release audit warnings.

    +1 core tests.  The patch passed unit tests in ..

Test results: https://builds.apache.org/job/PreCommit-GIRAPH-Build/38//testReport/
Console output: https://builds.apache.org/job/PreCommit-GIRAPH-Build/38//console

This message is automatically generated.
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-322) Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs

Posted by "Hyunsik Choi (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13454081#comment-13454081 ] 

Hyunsik Choi commented on GIRAPH-322:
-------------------------------------

Sorry for disturbing. I wrote the above comment.
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch, GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the data structures and code paths used to transport messages through a Giraph application and out on the network. While messages to a single vertex can be combined (and should be) in some applications that could make use of this broadcast messaging, the out of control message growth of algorithms like triangle closing means we need to de-duplicate messages bound for many vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) and currently it does not present a robust solution for disk-spill message stores. I figure I can get some advice about that or it can be a follow-up JIRA if this turns out to be a fruitful pursuit. This first patch is also Netty-only and simply defaults to the old sendMessagesToAllEdges() implementation if USE_NETTY is false. All this can be cleaned up when we know this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length encoding their delivery and only duplicating message on the network when they are bound for different partitions. This is also best when combined with "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, it represents an end-to-end solution, using Netty, for in-memory messaging. It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira