You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by "Sebastian Schelter (Created) (JIRA)" <ji...@apache.org> on 2011/12/21 15:27:30 UTC

[jira] [Created] (GIRAPH-114) Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor

Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor
-------------------------------------------------------------------------------------

                 Key: GIRAPH-114
                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
             Project: Giraph
          Issue Type: Bug
    Affects Versions: 0.70.0
            Reporter: Sebastian Schelter
            Priority: Critical
         Attachments: GIRAPH-114.patch

I'm currently implementing a simple algorithm to identify all the connected components of a graph. The algorithm ran well in a local IDE unit tests on toy data and in a local single node hadoop instance using a graph of ~100k edges.

When I tested it on a real cluster with the wikipedia pagelink graph (5.7M vertices, 130M edges), I ran into strange exceptions like this:

{noformat} 
2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
	at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
	at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
	at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
	... 7 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
	... 10 more
Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
	at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
	at java.lang.Thread.run(Thread.java:662)
{noformat} 

The exception is thrown because a vertex with no message to send to is found in the datastructure holding the outgoing messages.

I tracked this behavior down:


In *BasicRPCCommunications:541-546* the map holding the outgoing messages for vertices of a particular machine is created. It's stored in two places _BasicRPCCommunications.outMessages_ and as member variable _outMessagesPerPeer_ of its _PeerConnection_ :

{noformat} 
outMsgMap = new HashMap<I, MsgList<M>>();
outMessages.put(addrUnresolved, outMsgMap);

PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
{noformat} 
	
In case that there are a lot of messages available for a particular vertex, a large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only happened in the wikipedia test). During this flush the list of messages for the vertex is sent out and replaced with an empty list in *BasicRPCCommunications:341*

{noformat}
outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
{noformat}

Now in the last flush that is trigggered at the end of the superstep we encounter an empty message list for the vertex and therefore the exception is thrown in *BasicRPCCommunications:228-247*

{noformat}
for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) {
...
  if (entry.getValue().isEmpty()) {
    throw new IllegalStateException(...);
}
{noformat}

Simply removing the list for the vertex when executing the large flush solved the issue (patch to come).

I'd like to note that it is generally very dangerous to let different classes have access to a datastructure directly and it produces subtle bugs like this. It would be better to think of a centralized way of handling the datastructure. 



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (GIRAPH-114) Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor

Posted by "Hudson (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13174293#comment-13174293 ] 

Hudson commented on GIRAPH-114:
-------------------------------

Integrated in Giraph-trunk-Commit #57 (See [https://builds.apache.org/job/Giraph-trunk-Commit/57/])
    GIRAPH-114: Inconsistent message map handling in
BasicRPCCommunications.LargeMessageFlushExecutor. (ssc via aching)

aching : http://svn.apache.org/viewcvs.cgi/?root=Apache-SVN&view=rev&rev=1221836
Files : 
* /incubator/giraph/trunk/CHANGELOG
* /incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java

                
> Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor
> -------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-114
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
>             Project: Giraph
>          Issue Type: Bug
>    Affects Versions: 0.70.0
>            Reporter: Sebastian Schelter
>            Priority: Critical
>         Attachments: GIRAPH-114.patch
>
>
> I'm currently implementing a simple algorithm to identify all the connected components of a graph. The algorithm ran well in a local IDE unit tests on toy data and in a local single node hadoop instance using a graph of ~100k edges.
> When I tested it on a real cluster with the wikipedia pagelink graph (5.7M vertices, 130M edges), I ran into strange exceptions like this:
> {noformat} 
> 2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
> 	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:396)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
> 	at org.apache.hadoop.mapred.Child.main(Child.java:253)
> Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
> 	at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
> 	at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
> 	... 7 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
> 	... 10 more
> Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> {noformat} 
> The exception is thrown because a vertex with no message to send to is found in the datastructure holding the outgoing messages.
> I tracked this behavior down:
> In *BasicRPCCommunications:541-546* the map holding the outgoing messages for vertices of a particular machine is created. It's stored in two places _BasicRPCCommunications.outMessages_ and as member variable _outMessagesPerPeer_ of its _PeerConnection_ :
> {noformat} 
> outMsgMap = new HashMap<I, MsgList<M>>();
> outMessages.put(addrUnresolved, outMsgMap);
> PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
> {noformat} 
> 	
> In case that there are a lot of messages available for a particular vertex, a large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only happened in the wikipedia test). During this flush the list of messages for the vertex is sent out and replaced with an empty list in *BasicRPCCommunications:341*
> {noformat}
> outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
> peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
> {noformat}
> Now in the last flush that is trigggered at the end of the superstep we encounter an empty message list for the vertex and therefore the exception is thrown in *BasicRPCCommunications:228-247*
> {noformat}
> for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) {
> ...
>   if (entry.getValue().isEmpty()) {
>     throw new IllegalStateException(...);
> }
> {noformat}
> Simply removing the list for the vertex when executing the large flush solved the issue (patch to come).
> I'd like to note that it is generally very dangerous to let different classes have access to a datastructure directly and it produces subtle bugs like this. It would be better to think of a centralized way of handling the datastructure. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (GIRAPH-114) Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor

Posted by "Sebastian Schelter (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sebastian Schelter updated GIRAPH-114:
--------------------------------------

    Attachment: GIRAPH-114.patch
    
> Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor
> -------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-114
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
>             Project: Giraph
>          Issue Type: Bug
>    Affects Versions: 0.70.0
>            Reporter: Sebastian Schelter
>            Priority: Critical
>         Attachments: GIRAPH-114.patch
>
>
> I'm currently implementing a simple algorithm to identify all the connected components of a graph. The algorithm ran well in a local IDE unit tests on toy data and in a local single node hadoop instance using a graph of ~100k edges.
> When I tested it on a real cluster with the wikipedia pagelink graph (5.7M vertices, 130M edges), I ran into strange exceptions like this:
> {noformat} 
> 2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
> 	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:396)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
> 	at org.apache.hadoop.mapred.Child.main(Child.java:253)
> Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
> 	at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
> 	at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
> 	... 7 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
> 	... 10 more
> Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> {noformat} 
> The exception is thrown because a vertex with no message to send to is found in the datastructure holding the outgoing messages.
> I tracked this behavior down:
> In *BasicRPCCommunications:541-546* the map holding the outgoing messages for vertices of a particular machine is created. It's stored in two places _BasicRPCCommunications.outMessages_ and as member variable _outMessagesPerPeer_ of its _PeerConnection_ :
> {noformat} 
> outMsgMap = new HashMap<I, MsgList<M>>();
> outMessages.put(addrUnresolved, outMsgMap);
> PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
> {noformat} 
> 	
> In case that there are a lot of messages available for a particular vertex, a large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only happened in the wikipedia test). During this flush the list of messages for the vertex is sent out and replaced with an empty list in *BasicRPCCommunications:341*
> {noformat}
> outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
> peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
> {noformat}
> Now in the last flush that is trigggered at the end of the superstep we encounter an empty message list for the vertex and therefore the exception is thrown in *BasicRPCCommunications:228-247*
> {noformat}
> for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) {
> ...
>   if (entry.getValue().isEmpty()) {
>     throw new IllegalStateException(...);
> }
> {noformat}
> Simply removing the list for the vertex when executing the large flush solved the issue (patch to come).
> I'd like to note that it is generally very dangerous to let different classes have access to a datastructure directly and it produces subtle bugs like this. It would be better to think of a centralized way of handling the datastructure. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (GIRAPH-114) Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor

Posted by "Avery Ching (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/GIRAPH-114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13174283#comment-13174283 ] 

Avery Ching commented on GIRAPH-114:
------------------------------------

+1, nice find!  The whole RPC thing is a bit messy now, agreed.
                
> Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor
> -------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-114
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
>             Project: Giraph
>          Issue Type: Bug
>    Affects Versions: 0.70.0
>            Reporter: Sebastian Schelter
>            Priority: Critical
>         Attachments: GIRAPH-114.patch
>
>
> I'm currently implementing a simple algorithm to identify all the connected components of a graph. The algorithm ran well in a local IDE unit tests on toy data and in a local single node hadoop instance using a graph of ~100k edges.
> When I tested it on a real cluster with the wikipedia pagelink graph (5.7M vertices, 130M edges), I ran into strange exceptions like this:
> {noformat} 
> 2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
> 	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:396)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
> 	at org.apache.hadoop.mapred.Child.main(Child.java:253)
> Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
> 	at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
> 	at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
> 	... 7 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
> 	... 10 more
> Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> {noformat} 
> The exception is thrown because a vertex with no message to send to is found in the datastructure holding the outgoing messages.
> I tracked this behavior down:
> In *BasicRPCCommunications:541-546* the map holding the outgoing messages for vertices of a particular machine is created. It's stored in two places _BasicRPCCommunications.outMessages_ and as member variable _outMessagesPerPeer_ of its _PeerConnection_ :
> {noformat} 
> outMsgMap = new HashMap<I, MsgList<M>>();
> outMessages.put(addrUnresolved, outMsgMap);
> PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
> {noformat} 
> 	
> In case that there are a lot of messages available for a particular vertex, a large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only happened in the wikipedia test). During this flush the list of messages for the vertex is sent out and replaced with an empty list in *BasicRPCCommunications:341*
> {noformat}
> outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
> peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
> {noformat}
> Now in the last flush that is trigggered at the end of the superstep we encounter an empty message list for the vertex and therefore the exception is thrown in *BasicRPCCommunications:228-247*
> {noformat}
> for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) {
> ...
>   if (entry.getValue().isEmpty()) {
>     throw new IllegalStateException(...);
> }
> {noformat}
> Simply removing the list for the vertex when executing the large flush solved the issue (patch to come).
> I'd like to note that it is generally very dangerous to let different classes have access to a datastructure directly and it produces subtle bugs like this. It would be better to think of a centralized way of handling the datastructure. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (GIRAPH-114) Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor

Posted by "Avery Ching (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/GIRAPH-114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Avery Ching reassigned GIRAPH-114:
----------------------------------

    Assignee: Sebastian Schelter
    
> Inconsistent message map handling in BasicRPCCommunications.LargeMessageFlushExecutor
> -------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-114
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
>             Project: Giraph
>          Issue Type: Bug
>    Affects Versions: 0.70.0
>            Reporter: Sebastian Schelter
>            Assignee: Sebastian Schelter
>            Priority: Critical
>         Attachments: GIRAPH-114.patch
>
>
> I'm currently implementing a simple algorithm to identify all the connected components of a graph. The algorithm ran well in a local IDE unit tests on toy data and in a local single node hadoop instance using a graph of ~100k edges.
> When I tested it on a real cluster with the wikipedia pagelink graph (5.7M vertices, 130M edges), I ran into strange exceptions like this:
> {noformat} 
> 2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: run: Caught an unrecoverable exception flush: Got ExecutionException
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
> 	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> 	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
> 	at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:396)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
> 	at org.apache.hadoop.mapred.Child.main(Child.java:253)
> Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
> 	at org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
> 	at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
> 	at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
> 	... 7 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:83)
> 	at org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
> 	... 10 more
> Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 1603276
> 	at org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> {noformat} 
> The exception is thrown because a vertex with no message to send to is found in the datastructure holding the outgoing messages.
> I tracked this behavior down:
> In *BasicRPCCommunications:541-546* the map holding the outgoing messages for vertices of a particular machine is created. It's stored in two places _BasicRPCCommunications.outMessages_ and as member variable _outMessagesPerPeer_ of its _PeerConnection_ :
> {noformat} 
> outMsgMap = new HashMap<I, MsgList<M>>();
> outMessages.put(addrUnresolved, outMsgMap);
> PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
> {noformat} 
> 	
> In case that there are a lot of messages available for a particular vertex, a large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only happened in the wikipedia test). During this flush the list of messages for the vertex is sent out and replaced with an empty list in *BasicRPCCommunications:341*
> {noformat}
> outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
> peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
> {noformat}
> Now in the last flush that is trigggered at the end of the superstep we encounter an empty message list for the vertex and therefore the exception is thrown in *BasicRPCCommunications:228-247*
> {noformat}
> for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) {
> ...
>   if (entry.getValue().isEmpty()) {
>     throw new IllegalStateException(...);
> }
> {noformat}
> Simply removing the list for the vertex when executing the large flush solved the issue (patch to come).
> I'd like to note that it is generally very dangerous to let different classes have access to a datastructure directly and it produces subtle bugs like this. It would be better to think of a centralized way of handling the datastructure. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira