You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/13 18:56:55 UTC
[39/50] [abbrv] tinkerpop git commit: what a day. So I have Akka
remoting working (not fully). We now have akka.io.GryoSerializer :). And
TraversalActorProgram is smart about detach() and attach(). Learned a bunch
about how to do Partitioner/Partitions i
what a day. So I have Akka remoting working (not fully). We now have akka.io.GryoSerializer :). And TraversalActorProgram is smart about detach() and attach(). Learned a bunch about how to do Partitioner/Partitions in TinkerPop.... stuff is coming along nicely. Need a break though...been coding for 4 hours straight.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/c440326c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/c440326c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/c440326c
Branch: refs/heads/TINKERPOP-1564
Commit: c440326c2488230609f06606dc1b6ee52ab6fc54
Parents: 9b0f4a8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 11 14:38:13 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 13 11:55:55 2017 -0700
----------------------------------------------------------------------
akka-gremlin/pom.xml | 11 +++
.../akka/process/actors/AkkaGraphActors.java | 8 +-
.../akka/process/actors/MasterActor.java | 6 +-
.../akka/process/actors/io/GryoSerializer.java | 90 ++++++++++++++++++++
.../src/main/resources/application.conf | 40 +++++++--
.../akka/process/actors/AkkaPlayTest.java | 11 +--
.../gremlin/process/actors/ActorProgram.java | 14 +++
.../actors/traversal/TraversalActorProgram.java | 6 +-
.../traversal/TraversalMasterProgram.java | 15 +++-
.../traversal/TraversalWorkerProgram.java | 37 +++++---
.../traversal/message/BarrierAddMessage.java | 8 +-
.../traversal/message/SideEffectAddMessage.java | 8 +-
.../tinkerpop/gremlin/structure/Partition.java | 4 +-
.../gremlin/structure/util/Attachable.java | 37 +++++++-
.../util/partitioner/GlobalPartitioner.java | 13 ++-
.../util/config/SerializableConfiguration.java | 77 +++++++++++++++++
16 files changed, 347 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/akka-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/akka-gremlin/pom.xml b/akka-gremlin/pom.xml
index f88ec7d..daebdfe 100644
--- a/akka-gremlin/pom.xml
+++ b/akka-gremlin/pom.xml
@@ -47,6 +47,17 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-remote_2.11</artifactId>
+ <version>2.4.14</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 3bd5fa6..acc06ff 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -37,11 +37,11 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -57,8 +57,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
private boolean executed = false;
private AkkaGraphActors(final Configuration configuration) {
- this.configuration = new BaseConfiguration();
- ConfigurationUtils.copy(configuration, this.configuration);
+ this.configuration = new SerializableConfiguration(configuration);
this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName());
GraphActorsHelper.configure(this, this.configuration);
}
@@ -98,7 +97,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
stream().
map(Class::getCanonicalName).
collect(Collectors.toList()).toString()));
- final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
+ final ActorSystem system = ActorSystem.create("traversal", config);
final ActorsResult<R> result = new DefaultActorsResult<>();
final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers);
try {
@@ -126,5 +125,6 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
public static AkkaGraphActors open() {
return new AkkaGraphActors(new BaseConfiguration());
}
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
index 97951a8..b9c30bf 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -21,9 +21,12 @@ package org.apache.tinkerpop.gremlin.akka.process.actors;
import akka.actor.AbstractActor;
import akka.actor.ActorSelection;
+import akka.actor.AddressFromURIString;
+import akka.actor.Deploy;
import akka.actor.Props;
import akka.dispatch.RequiresMessageQueue;
import akka.japi.pf.ReceiveBuilder;
+import akka.remote.RemoteScope;
import org.apache.tinkerpop.gremlin.process.actors.Actor;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
@@ -61,9 +64,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
this.workers = new ArrayList<>();
final List<Partition> partitions = partitioner.getPartitions();
for (final Partition partition : partitions) {
+ akka.actor.Address addr = AddressFromURIString.parse("akka.tcp://traversal@127.0.0.1:2552");
final String workerPathString = "worker-" + partition.id();
this.workers.add(new Address.Worker(workerPathString, partition.location()));
- context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString);
+ context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner).withDeploy(new Deploy(new RemoteScope(addr))), workerPathString);
}
this.masterProgram = program.createMasterProgram(this);
receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
new file mode 100644
index 0000000..ab2b16a
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors.io;
+
+import akka.serialization.Serializer;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import org.apache.tinkerpop.shaded.kryo.io.Input;
+import org.apache.tinkerpop.shaded.kryo.io.Output;
+import scala.Option;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GryoSerializer implements Serializer {
+
+ private final GryoPool gryoPool;
+
+ public GryoSerializer() {
+ this.gryoPool = GryoPool.build().
+ poolSize(100).
+ initializeMapper(builder ->
+ builder.referenceTracking(true).
+ registrationRequired(true).
+ addCustom(
+ StartMessage.class,
+ BarrierAddMessage.class,
+ SideEffectAddMessage.class)).create();
+ }
+
+ @Override
+ public int identifier() {
+ return 0;
+ }
+
+ @Override
+ public byte[] toBinary(final Object object) {
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final Output output = new Output(outputStream);
+ this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object));
+ output.flush();
+ return outputStream.toByteArray();
+ }
+
+ @Override
+ public boolean includeManifest() {
+ return true;
+ }
+
+ @Override
+ public Object fromBinary(byte[] bytes, Option<Class<?>> option) {
+ return option.isEmpty() ? this.fromBinary(bytes) : this.fromBinary(bytes, option.get());
+ }
+
+ @Override
+ public Object fromBinary(byte[] bytes) {
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ final Input input = new Input(inputStream);
+ return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input));
+ }
+
+ @Override
+ public Object fromBinary(byte[] bytes, Class<?> aClass) {
+ final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+ final Input input = new Input(inputStream);
+ return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); // todo: be smart about just reading object
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index 7ee599a..393881a 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -1,15 +1,43 @@
+akka {
+ log-dead-letters-during-shutdown = "false"
+}
+
+custom-dispatcher-mailbox {
+ mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
+}
+
custom-dispatcher {
mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics"
}
-akka.actor.mailbox.requirements {
- "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+akka.actor {
+ provider = local
+ serialize-messages = off
+ serializers {
+ gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer"
+ }
+ serialization-bindings {
+ "org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage" = gryo
+ "org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage" = gryo
+ "org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram" = gryo
+ }
+ mailbox.requirements {
+ "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+ }
}
-akka {
- log-dead-letters-during-shutdown = "false"
+akka.remote {
+ enabled-transports = ["akka.remote.netty.tcp"]
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2552
+ }
}
-custom-dispatcher-mailbox {
- mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
+akka.cluster {
+ seed-nodes = [
+ "akka.tcp://traversal@127.0.0.1:2551",
+ "akka.tcp://traversal@127.0.0.1:2552"]
+
+ auto-down-unreachable-after = 10s
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
index d4562eb..c95f336 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors;
import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.junit.Ignore;
@@ -30,6 +31,7 @@ import org.junit.Test;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -42,12 +44,11 @@ public class AkkaPlayTest {
final Graph graph = TinkerGraph.open();
graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3));
- // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+ // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+
+ System.out.println(g.V().groupCount().by(T.label).toList());
+
- for (int i = 0; i < 1000; i++) {
- if (12l != g.V().union(out(), in()).values("name").count().next())
- System.out.println(i);
- }
//3, 1.9, 1
/*for (int i = 0; i < 10000; i++) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
index b1e3065..e3713ad 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
@@ -22,6 +22,7 @@ package org.apache.tinkerpop.gremlin.process.actors;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Optional;
@@ -99,6 +100,19 @@ public interface ActorProgram extends Cloneable {
@SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
public ActorProgram clone();
+ public static <A extends ActorProgram> A createActorProgram(final Graph graph, final Configuration configuration) {
+ try {
+ final Class<A> actorProgramClass = (Class) Class.forName(configuration.getString(ACTOR_PROGRAM));
+ final Constructor<A> constructor = actorProgramClass.getDeclaredConstructor();
+ constructor.setAccessible(true);
+ final A actorProgram = constructor.newInstance();
+ actorProgram.loadState(graph, configuration);
+ return actorProgram;
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
/**
* The Worker program is executed by a worker process in the {@link GraphActors} system.
* There are many workers and a single master.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index 484b904..c97ffd7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -44,7 +44,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.Repe
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -53,7 +55,7 @@ import java.util.Optional;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class TraversalActorProgram<R> implements ActorProgram {
+public final class TraversalActorProgram<R> implements ActorProgram, Serializable {
public static final String TRAVERSAL_ACTOR_PROGRAM_BYTECODE = "gremlin.traversalActorProgram.bytecode";
@@ -68,9 +70,11 @@ public final class TraversalActorProgram<R> implements ActorProgram {
private Traversal.Admin<?, R> traversal;
public TraverserSet<R> result = new TraverserSet<>();
+ private Configuration configuration;
public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
this.traversal = traversal;
+ this.configuration = new SerializableConfiguration(configuration);
final TraversalStrategies strategies = this.traversal.getStrategies().clone();
strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
// TODO: make TinkerGraph/etc. strategies smart about actors
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index e447cdb..796e4c1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -44,6 +44,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import java.util.HashMap;
import java.util.Map;
@@ -144,6 +145,7 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
}
private void processTraverser(final Traverser.Admin traverser) {
+ this.attachTraverser(traverser);
if (traverser.isHalted() || traverser.get() instanceof Element) {
this.sendTraverser(traverser);
} else {
@@ -163,9 +165,9 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
if (traverser.isHalted())
this.results.add(traverser);
else if (traverser.get() instanceof Element)
- this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser);
+ this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), this.detachTraverser(traverser));
else
- this.master.send(this.master.address(), traverser);
+ this.master.send(this.master.address(), this.detachTraverser(traverser));
}
private void orderBarrier(final Step step) {
@@ -176,4 +178,13 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
barrier.addBarrier(rangingBarrier);
}
}
+
+ private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) {
+ return true ? traverser : traverser.detach();
+ }
+
+ private void attachTraverser(final Traverser.Admin traverser) {
+ if (false && traverser.get() instanceof Element)
+ traverser.attach(Attachable.Method.get(this.master.partitioner().getPartition((Element) traverser.get())));
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 127322f..fa5645d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -40,6 +40,7 @@ import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.HashMap;
@@ -99,10 +100,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
//System.out.println(message + "::" + this.isLeader);
if (message instanceof StartMessage) {
// initial message from master that says: "start processing"
- final GraphStep<?,?> step = (GraphStep) this.matrix.getTraversal().getStartStep();
+ final GraphStep<?, ?> step = (GraphStep) this.matrix.getTraversal().getStartStep();
while (step.hasNext()) {
final Traverser.Admin<? extends Element> traverser = step.next();
- this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(), traverser);
+ this.self.send(traverser.isHalted() ? this.self.master() : this.self.address(), this.detachTraverser(traverser));
}
} else if (message instanceof Traverser.Admin) {
this.processTraverser((Traverser.Admin) message);
@@ -147,20 +148,26 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
//////////////
private void processTraverser(final Traverser.Admin traverser) {
- assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get());
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- step.addStart(traverser);
- if (step instanceof Barrier) {
- this.barriers.put(step.getId(), (Barrier) step);
- } else {
- while (step.hasNext()) {
- this.sendTraverser(step.next());
+ assert !(traverser.get() instanceof Element) || this.self.partition().contains((Element) traverser.get());
+ if (traverser.isHalted())
+ this.sendTraverser(traverser);
+ else {
+ this.attachTraverser(traverser);
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ if (step instanceof Barrier) {
+ this.barriers.put(step.getId(), (Barrier) step);
+ } else {
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
}
}
}
private void sendTraverser(final Traverser.Admin traverser) {
this.voteToHalt = false;
+ this.detachTraverser(traverser);
if (traverser.isHalted())
this.self.send(this.self.master(), traverser);
else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
@@ -168,4 +175,14 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
else
this.self.send(this.self.address(), traverser);
}
+
+ private final Traverser.Admin detachTraverser(final Traverser.Admin traverser) {
+ return true ? traverser : traverser.detach();
+ }
+
+ private final Traverser.Admin attachTraverser(final Traverser.Admin traverser) {
+ if (false)
+ traverser.attach(Attachable.Method.get(this.self.partition()));
+ return traverser;
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
index ac4c61d..ade6796 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
@@ -27,8 +27,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
*/
public final class BarrierAddMessage {
- private final Object barrier;
- private final String stepId;
+ private Object barrier;
+ private String stepId;
+
+ private BarrierAddMessage() {
+ // for serialization
+ }
public BarrierAddMessage(final Barrier barrier) {
this.barrier = barrier.nextBarrier();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
index 1c0a9de..bcc3223 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
@@ -24,8 +24,12 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
*/
public final class SideEffectAddMessage {
- private final String key;
- private final Object value;
+ private String key;
+ private Object value;
+
+ private SideEffectAddMessage() {
+ // for serialization
+ }
public SideEffectAddMessage(final String key, final Object value) {
this.value = value;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
index f20b9fb..49389f1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/Partition.java
@@ -19,6 +19,8 @@
package org.apache.tinkerpop.gremlin.structure;
+import org.apache.tinkerpop.gremlin.structure.util.Host;
+
import java.net.InetAddress;
import java.net.URI;
import java.util.Iterator;
@@ -32,7 +34,7 @@ import java.util.UUID;
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public interface Partition {
+public interface Partition extends Host {
/**
* Whether or not this element was, is, or will be contained in this partition.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
index fa999aa..f748ee6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/Attachable.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -75,21 +76,27 @@ public interface Attachable<V> {
if (base instanceof Vertex) {
final Optional<Vertex> optional = hostVertexOrGraph instanceof Graph ?
Method.getVertex((Attachable<Vertex>) attachable, (Graph) hostVertexOrGraph) :
- Method.getVertex((Attachable<Vertex>) attachable, (Vertex) hostVertexOrGraph);
+ hostVertexOrGraph instanceof Vertex ?
+ Method.getVertex((Attachable<Vertex>) attachable, (Vertex) hostVertexOrGraph) :
+ Method.getVertex((Attachable<Vertex>) attachable, (Partition) hostVertexOrGraph);
return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof Graph ?
Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable, (Graph) hostVertexOrGraph) :
Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable, (Vertex) hostVertexOrGraph));
} else if (base instanceof Edge) {
final Optional<Edge> optional = hostVertexOrGraph instanceof Graph ?
Method.getEdge((Attachable<Edge>) attachable, (Graph) hostVertexOrGraph) :
- Method.getEdge((Attachable<Edge>) attachable, (Vertex) hostVertexOrGraph);
+ hostVertexOrGraph instanceof Vertex ?
+ Method.getEdge((Attachable<Edge>) attachable, (Vertex) hostVertexOrGraph) :
+ Method.getEdge((Attachable<Edge>) attachable, (Partition) hostVertexOrGraph);
return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof Graph ?
Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable, (Graph) hostVertexOrGraph) :
Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable, (Vertex) hostVertexOrGraph));
} else if (base instanceof VertexProperty) {
final Optional<VertexProperty> optional = hostVertexOrGraph instanceof Graph ?
Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Graph) hostVertexOrGraph) :
- Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Vertex) hostVertexOrGraph);
+ hostVertexOrGraph instanceof Vertex ?
+ Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Vertex) hostVertexOrGraph) :
+ Method.getVertexProperty((Attachable<VertexProperty>) attachable, (Partition) hostVertexOrGraph);
return (V) optional.orElseThrow(() -> hostVertexOrGraph instanceof Graph ?
Attachable.Exceptions.canNotGetAttachableFromHostGraph(attachable, (Graph) hostVertexOrGraph) :
Attachable.Exceptions.canNotGetAttachableFromHostVertex(attachable, (Vertex) hostVertexOrGraph));
@@ -178,6 +185,11 @@ public interface Attachable<V> {
return ElementHelper.areEqual(attachableVertex.get(), hostVertex) ? Optional.of(hostVertex) : Optional.empty();
}
+ public static Optional<Vertex> getVertex(final Attachable<Vertex> attachableVertex, final Partition hostPartition) {
+ final Iterator<Vertex> iterator = hostPartition.vertices(attachableVertex.get().id());
+ return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty();
+ }
+
public static Optional<Edge> getEdge(final Attachable<Edge> attachableEdge, final Graph hostGraph) {
final Iterator<Edge> edgeIterator = hostGraph.edges(attachableEdge.get().id());
return edgeIterator.hasNext() ? Optional.of(edgeIterator.next()) : Optional.empty();
@@ -194,6 +206,11 @@ public interface Attachable<V> {
return Optional.empty();
}
+ public static Optional<Edge> getEdge(final Attachable<Edge> attachableEdge, final Partition hostPartition) {
+ final Iterator<Edge> iterator = hostPartition.edges(attachableEdge.get().id());
+ return iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty();
+ }
+
public static Optional<VertexProperty> getVertexProperty(final Attachable<VertexProperty> attachableVertexProperty, final Graph hostGraph) {
final VertexProperty baseVertexProperty = attachableVertexProperty.get();
final Iterator<Vertex> vertexIterator = hostGraph.vertices(baseVertexProperty.element().id());
@@ -219,6 +236,20 @@ public interface Attachable<V> {
return Optional.empty();
}
+ public static Optional<VertexProperty> getVertexProperty(final Attachable<VertexProperty> attachableVertexProperty, final Partition hostPartition) {
+ final VertexProperty baseVertexProperty = attachableVertexProperty.get();
+ final Iterator<Vertex> vertexIterator= hostPartition.vertices(baseVertexProperty.element().id());
+ if (vertexIterator.hasNext()) {
+ final Iterator<VertexProperty<Object>> vertexPropertyIterator = vertexIterator.next().properties(baseVertexProperty.key());
+ while (vertexPropertyIterator.hasNext()) {
+ final VertexProperty vertexProperty = vertexPropertyIterator.next();
+ if (ElementHelper.areEqual(vertexProperty, baseVertexProperty))
+ return Optional.of(vertexProperty);
+ }
+ }
+ return Optional.empty();
+ }
+
public static Optional<Property> getProperty(final Attachable<Property> attachableProperty, final Graph hostGraph) {
final Property baseProperty = attachableProperty.get();
final Element propertyElement = attachableProperty.get().element();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
index 4d9f565..397c113 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -19,19 +19,24 @@
package org.apache.tinkerpop.gremlin.structure.util.partitioner;
+import org.apache.commons.configuration.MapConfiguration;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,12 +66,14 @@ public final class GlobalPartitioner implements Partitioner {
private class GlobalPartition implements Partition {
- private final Graph graph;
+ private transient Graph graph;
+ private final Map<String, Object> configuration = new HashMap<>();
private final String id;
private final InetAddress location;
private GlobalPartition(final Graph graph) {
this.graph = graph;
+ graph.configuration().getKeys().forEachRemaining(key -> configuration.put(key, graph.configuration().getProperty(key)));
this.id = "global-" + graph.getClass().getSimpleName().toLowerCase();
try {
this.location = InetAddress.getLocalHost();
@@ -82,11 +89,15 @@ public final class GlobalPartitioner implements Partitioner {
@Override
public Iterator<Vertex> vertices(final Object... ids) {
+ if(null == this.graph)
+ this.graph = GraphFactory.open(new MapConfiguration(this.configuration));
return this.graph.vertices(ids);
}
@Override
public Iterator<Edge> edges(final Object... ids) {
+ if(null == this.graph)
+ this.graph = GraphFactory.open(new MapConfiguration(this.configuration));
return this.graph.edges(ids);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/c440326c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
new file mode 100644
index 0000000..2a1eac1
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/config/SerializableConfiguration.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.util.config;
+
+import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SerializableConfiguration extends AbstractConfiguration implements Serializable {
+
+ private final Map<String, Object> properties = new HashMap<>();
+
+ public SerializableConfiguration() {
+ super();
+ super.setDelimiterParsingDisabled(true);
+ }
+
+ public SerializableConfiguration(final Configuration configuration) {
+ this();
+ this.copy(configuration);
+ }
+
+ @Override
+ protected void addPropertyDirect(final String key, final Object value) {
+ this.properties.put(key, value);
+ }
+
+ @Override
+ protected void clearPropertyDirect(final String key) {
+ this.properties.remove(key);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.properties.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(final String key) {
+ return this.properties.containsKey(key);
+ }
+
+ @Override
+ public Object getProperty(final String key) {
+ return this.properties.get(key);
+ }
+
+ @Override
+ public Iterator<String> getKeys() {
+ return this.properties.keySet().iterator();
+ }
+
+}