You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/13 18:57:01 UTC
[45/50] [abbrv] tinkerpop git commit: Added AkkaConfigFactory which
deals with creating an Akka Config. Identical in nature to SparkConf and
Hadoop's Configuration. Akka GryoSerializer pimp'd up a bit. I had a default
pool size of 256. Test cases were sl
Added AkkaConfigFactory which deals with creating an Akka Config. Identical in nature to SparkConf and Hadoop's Configuration. Akka GryoSerializer pimp'd up a bit. I had a default pool size of 256. Test cases were slow. Changed to 1 -- BOOMIN fast. Wonder what the nature of serializer reuse is in Akka.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/bdd14c25
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/bdd14c25
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/bdd14c25
Branch: refs/heads/TINKERPOP-1564
Commit: bdd14c25e98cc42f00b80dd89dde79dce00d7ce5
Parents: f0d2f26
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 12 07:24:12 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 13 11:55:55 2017 -0700
----------------------------------------------------------------------
.../akka/process/actors/AkkaConfigFactory.java | 62 ++++++++++++++++++++
.../akka/process/actors/AkkaGraphActors.java | 26 +-------
.../akka/process/actors/io/GryoSerializer.java | 17 +++---
.../src/main/resources/application.conf | 48 ++++++---------
4 files changed, 88 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bdd14c25/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
new file mode 100644
index 0000000..a85e25a
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class AkkaConfigFactory {
+
+ private AkkaConfigFactory() {
+ // static method class
+ }
+
+ static Config generateAkkaConfig(final ActorProgram actorProgram) {
+ final Map<String, String> registeredGryoClasses = new HashMap<>();
+ new GryoSerializer().getGryoMapper().getRegisteredClasses().stream().filter(clazz -> !clazz.isArray()).forEach(clazz -> {
+ int index = clazz.getCanonicalName().lastIndexOf(".");
+ registeredGryoClasses.put(null == clazz.getEnclosingClass() ?
+ clazz.getCanonicalName() :
+ clazz.getCanonicalName().substring(0, index) + "$" + clazz.getCanonicalName().substring(index + 1), "gryo");
+ });
+ return ConfigFactory.defaultApplication().
+ withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(registeredGryoClasses)).
+ withValue("custom-dispatcher.mailbox-requirement", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName())).
+ withValue("custom-dispatcher-mailbox.mailbox-type", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName())).
+ withValue("akka.actor.mailbox.requirements", ConfigValueFactory.fromMap(Collections.singletonMap(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName(), "custom-dispatcher-mailbox"))).
+ withValue("message-priorities",
+ ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().
+ orElse(Collections.singletonList(Object.class)).
+ stream().
+ map(Class::getCanonicalName).
+ collect(Collectors.toList()).toString()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bdd14c25/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 7641b01..2638bfa 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -21,12 +21,8 @@ package org.apache.tinkerpop.gremlin.akka.process.actors;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
import org.apache.tinkerpop.gremlin.process.actors.Address;
@@ -41,12 +37,8 @@ import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -93,24 +85,8 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
if (this.executed)
throw new IllegalStateException("Can not execute twice");
this.executed = true;
- final Map<String, String> registeredGryoClasses = new HashMap<>();
- new GryoSerializer().getGryoMapper().getRegisteredClasses().stream().filter(clazz -> !clazz.isArray()).forEach(clazz -> {
- int index = clazz.getCanonicalName().lastIndexOf(".");
- registeredGryoClasses.put(null == clazz.getEnclosingClass() ?
- clazz.getCanonicalName() :
- clazz.getCanonicalName().substring(0, index) + "$" + clazz.getCanonicalName().substring(index + 1), "gryo");
- });
- final Config config = ConfigFactory.defaultApplication().
- withValue("message-priorities",
- ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().
- orElse(Collections.singletonList(Object.class)).
- stream().
- map(Class::getCanonicalName).
- collect(Collectors.toList()).toString())).
- withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(registeredGryoClasses));
-
- final ActorSystem system = ActorSystem.create("traversal", config);
+ final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram));
final ActorsResult<R> result = new DefaultActorsResult<>();
final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers);
try {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bdd14c25/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
index 188ba9f..c567497 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
@@ -45,7 +45,7 @@ public final class GryoSerializer implements Serializer {
public GryoSerializer() {
this.gryoPool = GryoPool.build().
- poolSize(100).
+ poolSize(1).
initializeMapper(builder ->
builder.referenceTracking(true).
registrationRequired(true).
@@ -65,16 +65,15 @@ public final class GryoSerializer implements Serializer {
@Override
public int identifier() {
- return 0;
+ return GryoVersion.V3_0.hashCode();
}
@Override
public byte[] toBinary(final Object object) {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- final Output output = new Output(outputStream);
- this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object));
+ final Output output = new Output(new ByteArrayOutputStream());
+ this.gryoPool.writeWithKryo(kryo -> kryo.writeObject(output, object));
output.flush();
- return outputStream.toByteArray();
+ return output.getBuffer();
}
@Override
@@ -96,8 +95,8 @@ public final class GryoSerializer implements Serializer {
@Override
public Object fromBinary(byte[] bytes, Class<?> aClass) {
- final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
- final Input input = new Input(inputStream);
- return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); // todo: be smart about just reading object
+ final Input input = new Input();
+ input.setBuffer(bytes);
+ return this.gryoPool.readWithKryo(kryo -> kryo.readObject(input, aClass)); // todo: be smart about just reading object
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/bdd14c25/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index d1dbf4a..cd8b190 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -1,38 +1,24 @@
akka {
log-dead-letters-during-shutdown = "false"
-}
-
-custom-dispatcher-mailbox {
- mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
-}
-
-custom-dispatcher {
- mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics"
-}
-
-akka.actor {
- provider = remote
- serialize-messages = on
- serializers {
- gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer"
+ actor {
+ provider = remote
+ serialize-messages = on
+ serializers {
+ gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer"
+ }
}
- mailbox.requirements {
- "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+ remote {
+ enabled-transports = ["akka.remote.netty.tcp"]
+ netty.tcp {
+ hostname = "127.0.0.1"
+ port = 2552
+ }
}
-}
+ cluster {
+ seed-nodes = [
+ "akka.tcp://traversal@127.0.0.1:2551",
+ "akka.tcp://traversal@127.0.0.1:2552"]
-akka.remote {
- enabled-transports = ["akka.remote.netty.tcp"]
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2552
+ auto-down-unreachable-after = 10s
}
-}
-
-akka.cluster {
- seed-nodes = [
- "akka.tcp://traversal@127.0.0.1:2551",
- "akka.tcp://traversal@127.0.0.1:2552"]
-
- auto-down-unreachable-after = 10s
}
\ No newline at end of file