You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by "Eli Reisman (JIRA)" <ji...@apache.org> on 2012/09/14 01:10:07 UTC

[jira] [Created] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Eli Reisman created GIRAPH-328:
----------------------------------

             Summary: Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
                 Key: GIRAPH-328
                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
             Project: Giraph
          Issue Type: Improvement
          Components: bsp, graph
    Affects Versions: 0.2.0
            Reporter: Eli Reisman
            Assignee: Eli Reisman
            Priority: Minor
             Fix For: 0.2.0


Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.

By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.

This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.

Comments/ideas/help welcome! Thanks



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Re: [jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by Eli Reisman <ap...@gmail.com>.
Whew! Thats good to know. Digging around in there I had the feeling I might
be going down a blind alley. If you notice anything that is not good for a
final solution, drop me a line and I can start fixing/improving on this
until it starts looking like a real solution.


On Thu, Sep 13, 2012 at 4:26 PM, Avery Ching (JIRA) <ji...@apache.org> wrote:

>
>     [
> https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13455431#comment-13455431]
>
> Avery Ching commented on GIRAPH-328:
> ------------------------------------
>
> This is the right behavior that you suggest.  I was going to do it, but
> glad that you're taking a cut!
>
> > Outgoing messages from current superstep should be grouped at the sender
> by owning worker, not by partition
> >
> -----------------------------------------------------------------------------------------------------------
> >
> >                 Key: GIRAPH-328
> >                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
> >             Project: Giraph
> >          Issue Type: Improvement
> >          Components: bsp, graph
> >    Affects Versions: 0.2.0
> >            Reporter: Eli Reisman
> >            Assignee: Eli Reisman
> >            Priority: Minor
> >             Fix For: 0.2.0
> >
> >         Attachments: GIRAPH-328-1.patch
> >
> >
> > Currently, outgoing messages created by the Vertex#compute() cycle on
> each worker are stored and grouped by the partitionId on the destination
> worker to which the messages belong. This results in messages being
> duplicated on the wire per partition on a given receiving worker that has
> delivery vertices for those messages.
> > By partitioning the outgoing, current-superstep messages by destination
> worker, we can split them into partitions at insertion into a MessageStore
> on the destination worker. What we trade in come compute time while
> inserting at the receiver side, we gain in fine grained control over the
> real number of messages each worker caches outbound for any given worker
> before flushing, and how those flush messages are aggregated for delivery
> as well. Potentially, it allows for a great reduction in duplicate messages
> sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322,
> GIRAPH-314. You get the idea.
> > This might be a poor idea, and it can certainly use some additional
> refinement, but it passes mvn verify and may even run ;) It interoperates
> with the disk spill code, but not as well as it could. Consider this a
> request for comment on the idea (and the approach) rather than a finished
> product.
> > Comments/ideas/help welcome! Thanks
>
> --
> This message is automatically generated by JIRA.
> If you think it was sent incorrectly, please contact your JIRA
> administrators
> For more information on JIRA, see: http://www.atlassian.com/software/jira
>

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Hudson (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13468286#comment-13468286 ] 

Hudson commented on GIRAPH-328:
-------------------------------

Integrated in Giraph-trunk-Commit #217 (See [https://builds.apache.org/job/Giraph-trunk-Commit/217/])
    GIRAPH-328: Outgoing messages from current superstep should be grouped
at the sender by owning worker, not by partition. (Eli Reisman via
aching) (Revision 1393266)

     Result = FAILURE
aching : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1393266
Files : 
* /giraph/trunk/CHANGELOG
* /giraph/trunk/src/main/java/org/apache/giraph/comm/SendMessageCache.java
* /giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
* /giraph/trunk/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClient.java
* /giraph/trunk/src/main/java/org/apache/giraph/comm/requests/RequestType.java
* /giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java
* /giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendPartitionMessagesRequest.java
* /giraph/trunk/src/main/java/org/apache/giraph/comm/requests/SendWorkerMessagesRequest.java
* /giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
* /giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerInfo.java
* /giraph/trunk/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
* /giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch, GIRAPH-328-7.patch, GIRAPH-328.8.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13465869#comment-13465869 ] 

Eli Reisman commented on GIRAPH-328:
------------------------------------

Hi Avery, thanks for the review and for finding those uses of BspUtils that snuck past me. When I rebased this for v5 I thought I had caught them all. I'll fix the stuff you mentioned and post a patch here.

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13464463#comment-13464463 ] 

Avery Ching commented on GIRAPH-328:
------------------------------------

Thanks for keeping on this [~initialcontext]!  I put up a review with some comments.  This is almost there I think!

https://reviews.apache.org/r/7308/
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-7.patch

Fixed a couple of things. All the rest still applies ;)
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch, GIRAPH-328-7.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-5.patch

Rebased, fixed errors, passes 'mvn verify'
 etc.

Ready for review.


                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Avery Ching updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328.8.patch

Here's the fixed patch from my comments on reviewboard.  Let me know what you think Eli!
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch, GIRAPH-328-7.patch, GIRAPH-328.8.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13468285#comment-13468285 ] 

Avery Ching commented on GIRAPH-328:
------------------------------------

Committed, thanks for the great work Eli.
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch, GIRAPH-328-7.patch, GIRAPH-328.8.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13461933#comment-13461933 ] 

Eli Reisman commented on GIRAPH-328:
------------------------------------

Will do!

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-1.patch
    
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-6.patch
    
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13467238#comment-13467238 ] 

Avery Ching commented on GIRAPH-328:
------------------------------------

+1, then, please commit under with your name as the credit for the patch.
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch, GIRAPH-328-7.patch, GIRAPH-328.8.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-4.patch

Just a placeholder. Nearly done with the changes Avery asked for, I think I may have also fixed the partition-lookup bottleneck he mentioned on the way. However, when I don't look up partitions on the request receiver's end, I get a failure in the RequestFailureTest(s):

Anyone know what the deal is? I'll keep working on this.

{code}
send2Requests(org.apache.giraph.comm.RequestFailureTest)  Time elapsed: 0.591 sec  <<< FAILURE!
java.lang.AssertionError: expected:<70> but was:<0>
	at org.junit.Assert.fail(Assert.java:58)
	at org.junit.Assert.failNotEquals(Assert.java:259)
	at org.junit.Assert.assertEquals(Assert.java:80)
	at org.junit.Assert.assertEquals(Assert.java:88)
	at org.apache.giraph.comm.RequestFailureTest.checkResult(RequestFailureTest.java:137)
	at org.apache.giraph.comm.RequestFailureTest.send2Requests(RequestFailureTest.java:165)
{code}
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13462361#comment-13462361 ] 

Eli Reisman commented on GIRAPH-328:
------------------------------------

Dammit. thats what i thought. Thanks!

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13465926#comment-13465926 ] 

Eli Reisman commented on GIRAPH-328:
------------------------------------

Hi Avery, I'm posting a new patch (v6) that fixes the things you mentioned on review board. This patch doesn't build as there is still a problem with the GiraphConfiguration calls to create my I and M values. Here's the error I get. This is why I think I used BspUtils in the previous patch. The error dump is this (below). I will check this later when I have time but the calls are in this patch if you think I'm calling the new methods wrong let me know.

{code}
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] /home/computer/apache/giraph/target/munged/main/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java:[80,45] incompatible types
found   : org.apache.hadoop.io.WritableComparable
required: I
[ERROR] /home/computer/apache/giraph/target/munged/main/org/apache/giraph/comm/requests/SendPartitionCurrentMessagesRequest.java:[87,52] incompatible types
found   : org.apache.hadoop.io.Writable
required: M
[INFO] 2 errors 
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.786s
[INFO] Finished at: Fri Sep 28 14:30:36 PDT 2012
[INFO] Final Memory: 17M/155M
[INFO] ------------------------------------------------------------------------
{code}
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-2.patch

Fixed an issue mentioned by Maja that the partitionId() calls in WorkerInfo and PartitionOwner are not refering to the same partition concepts and are therefore confusing. Changed WorkerInfo API for call to getTaskId() to differentiate them better in the code. Otherwise the same, passes mvn verify etc.

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eli Reisman updated GIRAPH-328:
-------------------------------

    Attachment: GIRAPH-328-3.patch

A few small changes to make the solution more robust and ready for future mods/upgrades. Passes mvn verify, should be ready for initial review I think?

I tried not to make too many sweeping changes to how messaging works but simply to store outgoing I -> List<M> grouped by WorkerInfo (actual destintion address) rather than partitionId. The partitionId plumbing is left intact in this solution as some of the code related to dynamic repartitioning etc. uses it. I am curious if it would matter if we got rid of the map of partitionId -> InetSocketAddress used by partition code since it is not used that much, and the "service" in NettyWorkerClient can get this from vertexId -> PartitionOwner mapping it carries. Perhaps the partitionId -> InetSocketAddress is really just extra plumbing at this point?

anyway, let me know what should be done to improve this (or fix bugs!), otherwise I think its ready to go and/or other improvements could be in additional JIRAs. Will do some additional testing ASAP as well.

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13461953#comment-13461953 ] 

Eli Reisman commented on GIRAPH-328:
------------------------------------

About NettyWorkerClient line 179, the USE_WORKERINFO_ADDRESS is a flag to check the partitionId input arg against. I use it to let parts of the message-send plumbing signal to use their WorkerInfo for the remote address rather than the partitionId since (in its current form) they are not using a partitionId and don't know it any more at that point. The only reason those parts call getInetSocketAddress() instead of just pulling the address object out of the WorkerInfo directly is that getInetSocketAddress() loops to make sure the address is resolved which is nice. 

If I am changing this patch to keep the partition mapping then I might want to start over with this anyway, there isn't much different from the trunk version in that case. So all this might go away.

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13459091#comment-13459091 ] 

Avery Ching commented on GIRAPH-328:
------------------------------------

This is a great stuff Eli.  I'm really happy that you undertook this. =)

I also like the change of getPartitionId to getTaskId for less confusion with Giraph partitions.

NettyWorkerClient.java:179 - Can we separate this into 2 lines?  I'm also confused on whether this appears to be correct (given that USE_WORKERINFO_ADDRESS is final int)

Last, but most important, it would be great to preserve the notion of partitions in the message and the request (i.e. 

private Map<WorkerInfo, Map<Integer, Map<I, Collection<M>>>> messageCache

It is a bit more complex, but on the request processing side, it allows us the ability to not have to look up the partitions when adding the messages to them.  In my work on GIRAPH-329, I find that this improves performance quite a bit and we are limited by our ability to add messages to that map on the receiver.

Otherwise, this looks good.
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13462260#comment-13462260 ] 

Avery Ching commented on GIRAPH-328:
------------------------------------

You will have the same error I did.  The tests need to use the correct partition, see my diff (0).  It used to work because we looked up the partition id for every add (ouch).
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Eli Reisman (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13467234#comment-13467234 ] 

Eli Reisman commented on GIRAPH-328:
------------------------------------

I just ran 'mvn verify' on your .8 patch and it was successful, this seems good to go if you like it. Thanks again for all the help!

                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch, GIRAPH-328-2.patch, GIRAPH-328-3.patch, GIRAPH-328-4.patch, GIRAPH-328-5.patch, GIRAPH-328-6.patch, GIRAPH-328-7.patch, GIRAPH-328.8.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Commented] (GIRAPH-328) Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition

Posted by "Avery Ching (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13455431#comment-13455431 ] 

Avery Ching commented on GIRAPH-328:
------------------------------------

This is the right behavior that you suggest.  I was going to do it, but glad that you're taking a cut!
                
> Outgoing messages from current superstep should be grouped at the sender by owning worker, not by partition
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-328
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-328
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp, graph
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-328-1.patch
>
>
> Currently, outgoing messages created by the Vertex#compute() cycle on each worker are stored and grouped by the partitionId on the destination worker to which the messages belong. This results in messages being duplicated on the wire per partition on a given receiving worker that has delivery vertices for those messages.
> By partitioning the outgoing, current-superstep messages by destination worker, we can split them into partitions at insertion into a MessageStore on the destination worker. What we trade in come compute time while inserting at the receiver side, we gain in fine grained control over the real number of messages each worker caches outbound for any given worker before flushing, and how those flush messages are aggregated for delivery as well. Potentially, it allows for a great reduction in duplicate messages sent in situations like Vertex#sendMessageToAllEdges() -- see GIRAPH-322, GIRAPH-314. You get the idea.
> This might be a poor idea, and it can certainly use some additional refinement, but it passes mvn verify and may even run ;) It interoperates with the disk spill code, but not as well as it could. Consider this a request for comment on the idea (and the approach) rather than a finished product.
> Comments/ideas/help welcome! Thanks

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira