You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/12 14:24:17 UTC

tinkerpop git commit: Added AkkaConfigFactory which deals with creating an Akka Config. Identical in nature to SparkConf and Hadoop's Configuration. Akka GryoSerializer pimp'd up a bit. I had a default pool size of 256. Test cases were slow. Changed to 1

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 e9ffa82c2 -> 63847496f


Added AkkaConfigFactory which deals with creating an Akka Config. Identical in nature to SparkConf and Hadoop's Configuration. Akka GryoSerializer pimp'd up a bit. I had a default pool size of 256. Test cases were slow. Changed to 1 -- BOOMIN fast. Wonder what the nature of serializer reuse is in Akka.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/63847496
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/63847496
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/63847496

Branch: refs/heads/TINKERPOP-1564
Commit: 63847496fa48205b12c5df8f7aef24afb5b64ae0
Parents: e9ffa82
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 12 07:24:12 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Jan 12 07:24:12 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/AkkaConfigFactory.java  | 62 ++++++++++++++++++++
 .../akka/process/actors/AkkaGraphActors.java    | 26 +-------
 .../akka/process/actors/io/GryoSerializer.java  | 17 +++---
 .../src/main/resources/application.conf         | 48 ++++++---------
 4 files changed, 88 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63847496/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
new file mode 100644
index 0000000..a85e25a
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
@@ -0,0 +1,62 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class AkkaConfigFactory {
+
+    private AkkaConfigFactory() {
+        // static method class
+    }
+
+    static Config generateAkkaConfig(final ActorProgram actorProgram) {
+        final Map<String, String> registeredGryoClasses = new HashMap<>();
+        new GryoSerializer().getGryoMapper().getRegisteredClasses().stream().filter(clazz -> !clazz.isArray()).forEach(clazz -> {
+            int index = clazz.getCanonicalName().lastIndexOf(".");
+            registeredGryoClasses.put(null == clazz.getEnclosingClass() ?
+                    clazz.getCanonicalName() :
+                    clazz.getCanonicalName().substring(0, index) + "$" + clazz.getCanonicalName().substring(index + 1), "gryo");
+        });
+        return ConfigFactory.defaultApplication().
+                withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(registeredGryoClasses)).
+                withValue("custom-dispatcher.mailbox-requirement", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName())).
+                withValue("custom-dispatcher-mailbox.mailbox-type", ConfigValueFactory.fromAnyRef(ActorMailbox.class.getCanonicalName())).
+                withValue("akka.actor.mailbox.requirements", ConfigValueFactory.fromMap(Collections.singletonMap(ActorMailbox.class.getCanonicalName() + "$" + ActorMailbox.ActorSemantics.class.getSimpleName(), "custom-dispatcher-mailbox"))).
+                withValue("message-priorities",
+                        ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().
+                                orElse(Collections.singletonList(Object.class)).
+                                stream().
+                                map(Class::getCanonicalName).
+                                collect(Collectors.toList()).toString()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63847496/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 7641b01..2638bfa 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -21,12 +21,8 @@ package org.apache.tinkerpop.gremlin.akka.process.actors;
 
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.actors.Address;
@@ -41,12 +37,8 @@ import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -93,24 +85,8 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
         if (this.executed)
             throw new IllegalStateException("Can not execute twice");
         this.executed = true;
-        final Map<String, String> registeredGryoClasses = new HashMap<>();
-        new GryoSerializer().getGryoMapper().getRegisteredClasses().stream().filter(clazz -> !clazz.isArray()).forEach(clazz -> {
-            int index = clazz.getCanonicalName().lastIndexOf(".");
-            registeredGryoClasses.put(null == clazz.getEnclosingClass() ?
-                    clazz.getCanonicalName() :
-                    clazz.getCanonicalName().substring(0, index) + "$" + clazz.getCanonicalName().substring(index + 1), "gryo");
-        });
-        final Config config = ConfigFactory.defaultApplication().
-                withValue("message-priorities",
-                        ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().
-                                orElse(Collections.singletonList(Object.class)).
-                                stream().
-                                map(Class::getCanonicalName).
-                                collect(Collectors.toList()).toString())).
-                withValue("akka.actor.serialization-bindings", ConfigValueFactory.fromMap(registeredGryoClasses));
-
 
-        final ActorSystem system = ActorSystem.create("traversal", config);
+        final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram));
         final ActorsResult<R> result = new DefaultActorsResult<>();
         final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers);
         try {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63847496/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
index 188ba9f..c567497 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/io/GryoSerializer.java
@@ -45,7 +45,7 @@ public final class GryoSerializer implements Serializer {
 
     public GryoSerializer() {
         this.gryoPool = GryoPool.build().
-                poolSize(100).
+                poolSize(1).
                 initializeMapper(builder ->
                         builder.referenceTracking(true).
                                 registrationRequired(true).
@@ -65,16 +65,15 @@ public final class GryoSerializer implements Serializer {
 
     @Override
     public int identifier() {
-        return 0;
+        return GryoVersion.V3_0.hashCode();
     }
 
     @Override
     public byte[] toBinary(final Object object) {
-        final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        final Output output = new Output(outputStream);
-        this.gryoPool.writeWithKryo(kryo -> kryo.writeClassAndObject(output, object));
+        final Output output = new Output(new ByteArrayOutputStream());
+        this.gryoPool.writeWithKryo(kryo -> kryo.writeObject(output, object));
         output.flush();
-        return outputStream.toByteArray();
+        return output.getBuffer();
     }
 
     @Override
@@ -96,8 +95,8 @@ public final class GryoSerializer implements Serializer {
 
     @Override
     public Object fromBinary(byte[] bytes, Class<?> aClass) {
-        final ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
-        final Input input = new Input(inputStream);
-        return this.gryoPool.readWithKryo(kryo -> kryo.readClassAndObject(input)); // todo: be smart about just reading object
+        final Input input = new Input();
+        input.setBuffer(bytes);
+        return this.gryoPool.readWithKryo(kryo -> kryo.readObject(input, aClass)); // todo: be smart about just reading object
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/63847496/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index d1dbf4a..cd8b190 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -1,38 +1,24 @@
 akka {
   log-dead-letters-during-shutdown = "false"
-}
-
-custom-dispatcher-mailbox {
-  mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
-}
-
-custom-dispatcher {
-  mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics"
-}
-
-akka.actor {
-  provider = remote
-  serialize-messages = on
-  serializers {
-    gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer"
+  actor {
+    provider = remote
+    serialize-messages = on
+    serializers {
+      gryo = "org.apache.tinkerpop.gremlin.akka.process.actors.io.GryoSerializer"
+    }
   }
-  mailbox.requirements {
-    "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+  remote {
+    enabled-transports = ["akka.remote.netty.tcp"]
+    netty.tcp {
+      hostname = "127.0.0.1"
+      port = 2552
+    }
   }
-}
+  cluster {
+    seed-nodes = [
+      "akka.tcp://traversal@127.0.0.1:2551",
+      "akka.tcp://traversal@127.0.0.1:2552"]
 
-akka.remote {
-  enabled-transports = ["akka.remote.netty.tcp"]
-  netty.tcp {
-    hostname = "127.0.0.1"
-    port = 2552
+    auto-down-unreachable-after = 10s
   }
-}
-
-akka.cluster {
-  seed-nodes = [
-    "akka.tcp://traversal@127.0.0.1:2551",
-    "akka.tcp://traversal@127.0.0.1:2552"]
-
-  auto-down-unreachable-after = 10s
 }
\ No newline at end of file