You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tinkerpop.apache.org by "Felix Chapman (JIRA)" <ji...@apache.org> on 2016/10/18 16:26:58 UTC
[jira] [Updated] (TINKERPOP-1519) TinkerGraphComputer doesn't
handle multiple MessageScopes in single iteration
[ https://issues.apache.org/jira/browse/TINKERPOP-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Felix Chapman updated TINKERPOP-1519:
-------------------------------------
Description:
When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.
The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.
An example of this behaviour is below:
{code:java}
public class TinkerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
TinkerGraph graph = TinkerGraph.open();
Vertex a = graph.addVertex("a");
Vertex b = graph.addVertex("b");
Vertex c = graph.addVertex("c");
a.addEdge("edge", b);
b.addEdge("edge", c);
// Simple graph:
// a -> b -> c
// Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b"
// then each vertex sums any received messages
ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get();
// We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
}
}
class MyVertexProgram implements VertexProgram<Long> {
private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
private static final String MEMORY_KEY = "count";
private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);
@Override
public void setup(final Memory memory) {}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<String> getElementComputeKeys() {
return COMPUTE_KEYS;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
}
@Override
public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
switch (memory.getIteration()) {
case 0:
if (vertex.label().equals("b")) {
messenger.sendMessage(this.countMessageScopeIn, 2L);
messenger.sendMessage(this.countMessageScopeOut, 1L);
}
break;
case 1:
long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
vertex.property(MEMORY_KEY, edgeCount);
break;
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public MyVertexProgram clone() {
try {
return (MyVertexProgram) super.clone();
} catch (final CloneNotSupportedException e) {
throw new RuntimeException(e);
}
}
}
{code}
was:
When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.
The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.
An example of this behaviour is below:
{{
public class TinkerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
TinkerGraph graph = TinkerGraph.open();
Vertex a = graph.addVertex("a");
Vertex b = graph.addVertex("b");
Vertex c = graph.addVertex("c");
a.addEdge("edge", b);
b.addEdge("edge", c);
// Simple graph:
// a -> b -> c
// Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b"
// then each vertex sums any received messages
ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get();
// We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
}
}
class MyVertexProgram implements VertexProgram<Long> {
private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
private static final String MEMORY_KEY = "count";
private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);
@Override
public void setup(final Memory memory) {}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<String> getElementComputeKeys() {
return COMPUTE_KEYS;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
}
@Override
public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
switch (memory.getIteration()) {
case 0:
if (vertex.label().equals("b")) {
messenger.sendMessage(this.countMessageScopeIn, 2L);
messenger.sendMessage(this.countMessageScopeOut, 1L);
}
break;
case 1:
long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
vertex.property(MEMORY_KEY, edgeCount);
break;
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public MyVertexProgram clone() {
try {
return (MyVertexProgram) super.clone();
} catch (final CloneNotSupportedException e) {
throw new RuntimeException(e);
}
}
}
}}
> TinkerGraphComputer doesn't handle multiple MessageScopes in single iteration
> -----------------------------------------------------------------------------
>
> Key: TINKERPOP-1519
> URL: https://issues.apache.org/jira/browse/TINKERPOP-1519
> Project: TinkerPop
> Issue Type: Bug
> Components: tinkergraph
> Affects Versions: 3.1.1-incubating
> Environment: Mac OSX
> Reporter: Felix Chapman
> Priority: Minor
>
> When executing a VertexProgram that sends messages on multiple MessageScopes in a single iteration, then the messages behave as if they were sent on all scopes within that iteration.
> The problem can be resolved by using only a single MessageScope per iteration, but this involves increasing the number of iterations.
> An example of this behaviour is below:
> {code:java}
> public class TinkerTest {
> public static void main(String[] args) throws ExecutionException, InterruptedException {
> TinkerGraph graph = TinkerGraph.open();
> Vertex a = graph.addVertex("a");
> Vertex b = graph.addVertex("b");
> Vertex c = graph.addVertex("c");
> a.addEdge("edge", b);
> b.addEdge("edge", c);
> // Simple graph:
> // a -> b -> c
> // Execute a traversal program that sends an incoming message of "2" and an outgoing message of "1" from "b"
> // then each vertex sums any received messages
> ComputerResult result = graph.compute().program(new MyVertexProgram()).submit().get();
> // We expect the results to be {a=2, b=0, c=1}. Instead it is {a=3, b=0, c=3}
> System.out.println(result.graph().traversal().V().group().by(Element::label).by("count").next());
> }
> }
> class MyVertexProgram implements VertexProgram<Long> {
> private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
> private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
> private static final String MEMORY_KEY = "count";
> private static final Set<String> COMPUTE_KEYS = Collections.singleton(MEMORY_KEY);
> @Override
> public void setup(final Memory memory) {}
> @Override
> public GraphComputer.Persist getPreferredPersist() {
> return GraphComputer.Persist.VERTEX_PROPERTIES;
> }
> @Override
> public Set<String> getElementComputeKeys() {
> return COMPUTE_KEYS;
> }
> @Override
> public Set<MessageScope> getMessageScopes(final Memory memory) {
> return Sets.newHashSet(countMessageScopeIn, countMessageScopeOut);
> }
> @Override
> public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
> switch (memory.getIteration()) {
> case 0:
> if (vertex.label().equals("b")) {
> messenger.sendMessage(this.countMessageScopeIn, 2L);
> messenger.sendMessage(this.countMessageScopeOut, 1L);
> }
> break;
> case 1:
> long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
> vertex.property(MEMORY_KEY, edgeCount);
> break;
> }
> }
> @Override
> public boolean terminate(final Memory memory) {
> return memory.getIteration() == 1;
> }
> @Override
> public GraphComputer.ResultGraph getPreferredResultGraph() {
> return GraphComputer.ResultGraph.NEW;
> }
> @Override
> public MyVertexProgram clone() {
> try {
> return (MyVertexProgram) super.clone();
> } catch (final CloneNotSupportedException e) {
> throw new RuntimeException(e);
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)