You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by "Felix Chapman (JIRA)" <ji...@apache.org> on 2016/10/18 16:26:58 UTC

[jira] [Updated] (TINKERPOP-1519) TinkerGraphComputer doesn't handle multiple MessageScopes in single iteration

     [ https://issues.apache.org/jira/browse/TINKERPOP-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Felix Chapman updated TINKERPOP-1519:
-------------------------------------
    Description: 
When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.

The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.

An example of this behaviour is below:

{code:java}
public class TinkerTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        TinkerGraph graph = TinkerGraph.open();
        Vertex a = graph.addVertex("a");
        Vertex b = graph.addVertex("b");
        Vertex c = graph.addVertex("c");
        a.addEdge("edge", b);
        b.addEdge("edge", c);

        // Simple graph:
        // a -> b -> c

        // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b"
        // then each vertex sums any received messages
        ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get();

        // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
        System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
    }
}

class MyVertexProgram implements VertexProgram<Long> {

    private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
    private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);

    private static final String MEMORY_KEY = "count";

    private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);

    @Override
    public void setup(final Memory memory) {}

    @Override
    public GraphComputer.Persist getPreferredPersist() {
        return GraphComputer.Persist.VERTEX_PROPERTIES;
    }

    @Override
    public Set<String> getElementComputeKeys() {
        return COMPUTE_KEYS;
    }

    @Override
    public Set<MessageScope> getMessageScopes(final Memory memory) {
        return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
    }

    @Override
    public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
        switch (memory.getIteration()) {
            case 0:
                if (vertex.label().equals("b")) {
                    messenger.sendMessage(this.countMessageScopeIn, 2L);
                    messenger.sendMessage(this.countMessageScopeOut, 1L);
                }
                break;
            case 1:
                long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
                vertex.property(MEMORY_KEY, edgeCount);
                break;
        }
    }

    @Override
    public boolean terminate(final Memory memory) {
        return memory.getIteration() == 1;
    }

    @Override
    public GraphComputer.ResultGraph getPreferredResultGraph() {
        return GraphComputer.ResultGraph.NEW;
    }

    @Override
    public MyVertexProgram clone() {
        try {
            return (MyVertexProgram) super.clone();
        } catch (final CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
    }

}
{code}

  was:
When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.

The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.

An example of this behaviour is below:

{{
public class TinkerTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        TinkerGraph graph = TinkerGraph.open();
        Vertex a = graph.addVertex("a");
        Vertex b = graph.addVertex("b");
        Vertex c = graph.addVertex("c");
        a.addEdge("edge", b);
        b.addEdge("edge", c);

        // Simple graph:
        // a -> b -> c

        // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b"
        // then each vertex sums any received messages
        ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get();

        // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
        System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
    }
}

class MyVertexProgram implements VertexProgram<Long> {

    private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
    private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);

    private static final String MEMORY_KEY = "count";

    private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);

    @Override
    public void setup(final Memory memory) {}

    @Override
    public GraphComputer.Persist getPreferredPersist() {
        return GraphComputer.Persist.VERTEX_PROPERTIES;
    }

    @Override
    public Set<String> getElementComputeKeys() {
        return COMPUTE_KEYS;
    }

    @Override
    public Set<MessageScope> getMessageScopes(final Memory memory) {
        return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
    }

    @Override
    public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
        switch (memory.getIteration()) {
            case 0:
                if (vertex.label().equals("b")) {
                    messenger.sendMessage(this.countMessageScopeIn, 2L);
                    messenger.sendMessage(this.countMessageScopeOut, 1L);
                }
                break;
            case 1:
                long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
                vertex.property(MEMORY_KEY, edgeCount);
                break;
        }
    }

    @Override
    public boolean terminate(final Memory memory) {
        return memory.getIteration() == 1;
    }

    @Override
    public GraphComputer.ResultGraph getPreferredResultGraph() {
        return GraphComputer.ResultGraph.NEW;
    }

    @Override
    public MyVertexProgram clone() {
        try {
            return (MyVertexProgram) super.clone();
        } catch (final CloneNotSupportedException e) {
            throw new RuntimeException(e);
        }
    }

}
}}


> TinkerGraphComputer doesn't handle multiple MessageScopes in single iteration
> -----------------------------------------------------------------------------
>
>                 Key: TINKERPOP-1519
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-1519
>             Project: TinkerPop
>          Issue Type: Bug
>          Components: tinkergraph
>    Affects Versions: 3.1.1-incubating
>         Environment: Mac OSX
>            Reporter: Felix Chapman
>            Priority: Minor
>
> When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.
> The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.
> An example of this behaviour is below:
> {code:java}
> public class TinkerTest {
>     public static void main(String[] args) throws ExecutionException, InterruptedException {
>         TinkerGraph graph = TinkerGraph.open();
>         Vertex a = graph.addVertex("a");
>         Vertex b = graph.addVertex("b");
>         Vertex c = graph.addVertex("c");
>         a.addEdge("edge", b);
>         b.addEdge("edge", c);
>         // Simple graph:
>         // a -> b -> c
>         // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b"
>         // then each vertex sums any received messages
>         ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get();
>         // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
>         System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
>     }
> }
> class MyVertexProgram implements VertexProgram<Long> {
>     private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
>     private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
>     private static final String MEMORY_KEY = "count";
>     private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);
>     @Override
>     public void setup(final Memory memory) {}
>     @Override
>     public GraphComputer.Persist getPreferredPersist() {
>         return GraphComputer.Persist.VERTEX_PROPERTIES;
>     }
>     @Override
>     public Set<String> getElementComputeKeys() {
>         return COMPUTE_KEYS;
>     }
>     @Override
>     public Set<MessageScope> getMessageScopes(final Memory memory) {
>         return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
>     }
>     @Override
>     public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
>         switch (memory.getIteration()) {
>             case 0:
>                 if (vertex.label().equals("b")) {
>                     messenger.sendMessage(this.countMessageScopeIn, 2L);
>                     messenger.sendMessage(this.countMessageScopeOut, 1L);
>                 }
>                 break;
>             case 1:
>                 long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
>                 vertex.property(MEMORY_KEY, edgeCount);
>                 break;
>         }
>     }
>     @Override
>     public boolean terminate(final Memory memory) {
>         return memory.getIteration() == 1;
>     }
>     @Override
>     public GraphComputer.ResultGraph getPreferredResultGraph() {
>         return GraphComputer.ResultGraph.NEW;
>     }
>     @Override
>     public MyVertexProgram clone() {
>         try {
>             return (MyVertexProgram) super.clone();
>         } catch (final CloneNotSupportedException e) {
>             throw new RuntimeException(e);
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)