You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@giraph.apache.org by Claudio Martella <cl...@gmail.com> on 2012/01/11 00:14:50 UTC

on the thread-safety of graph.partition

Hi,

as my tests all fail around the same code, i must start thinking there
must be a problem there :)
though the code is quite simple and the failures happen for different
reasons (once is even a divide by zero).

Basically i've refactored BasicRPCCommunications.putMsg*(),
putVertexIdMessagesList(). As the previous code, I need to collect the
set of vertexIds that are directed to vertices that don't exist in the
current partition, as they are going to be resolved later
(resolveVertexIndexSet in the trunk code). The reason why i changed
this code is that it is dependent on the inMessages which is a hashmap
that's not immutable, as it is the case of out-of-core sequencefiles.

basically the trunk code does:
for (partition in partitions)
    for (vertex in partition.vertices)
        messages = inMessages.remove(vertex)
        [...]


leaving inMessages at the end of the for loops with just the messages
directed to the aforementioned soon-to-be-created vertices.
As this strategy doesn't work out for me, I changed the putMsg*
methods to check it online in something like this:

   @Override
    public final void putMsg(I vertex, M msg) throws IOException {
        if (LOG.isDebugEnabled()) {
        	LOG.debug("putMsg: Adding msg " + msg + " on vertex " + vertex);
        }
        checkForMessageToNonExistentVertex(vertex);
        inMessages.addMessage(vertex, msg);
    }

where:

    private void checkForMessageToNonExistentVertex(I vertexId) {
    	// Populate resolveVertexIndexSet in case a message was sent
    	// to a non-existent vertex. It will be taken care of later by
    	// vertexResolver at prepareSuperstep()
    	Partition<I, V, E, M> partition;
    	if ((partition = service.getPartition(vertexId)) == null) {
    		throw new IllegalStateException(
    				"Impossible that this worker " +
    				service.getWorkerInfo() + " was sent " +
    		                blablablablab...;
    	} else {
    		if (partition.getVertex(vertexId) == null) {
         	    	synchronized(nonExistentVerticesIndexSet) {
        	    		nonExistentVerticesIndexSet.add(vertexId);
    	    	        }
    		}
    	}
    }


so I can collect the vertexIds as they come without without searching
for them later (which i actually cannot). basically instead of
creating resolveVertexIds at the end i spread it on the incoming
message management.

As all my failures happen in this latest method, once because
hashPartitioner divides by zero, once because it returns null for a
vertex it actually owns etc, and given I haven't changed any code
connected with partitioning whatsoever, i guess it must be due to some
thread-safety of this code.

So my question is:

1) can i access safely service.getPartition() and all the stuff
underneath it inside of the RPC code such as putMsg(), putMsgList()
etc?
2) if not, what strategy do you suggest to collect the
nonExistentVerticesIndexSet that i need later on at
prepareSuperstep()?

-- 
   Claudio Martella
   claudio.martella@gmail.com