You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/06/22 15:25:52 UTC

[6/8] usergrid git commit: Move cluster-singleton router config into RouterProducer and out of actorsystem module.

Move cluster-singleton router config into RouterProducer and out of actorsystem module.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fb1d78d0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fb1d78d0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fb1d78d0

Branch: refs/heads/usergrid-1268-akka-211
Commit: fb1d78d0469c33067ba415cafa44540c5185dd61
Parents: 343ac51
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jun 22 10:33:30 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jun 22 10:33:30 2016 -0400

----------------------------------------------------------------------
 .../persistence/actorsystem/ActorSystemFig.java | 29 ++-------
 .../actorsystem/ActorSystemManagerImpl.java     | 38 +++++------
 .../persistence/actorsystem/RouterProducer.java |  6 +-
 .../src/main/resources/application.conf         | 25 ++++++--
 .../collection/guice/CollectionModule.java      |  1 +
 .../uniquevalues/UniqueValuesFig.java           | 67 ++++++++++++++++++++
 .../uniquevalues/UniqueValuesServiceImpl.java   | 42 ++++++++++--
 .../uniquevalues/UniqueValuesTableImpl.java     |  8 +--
 .../src/test/resources/usergrid.properties      |  2 +-
 9 files changed, 160 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
index 50e860b..ec010d0 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
@@ -26,6 +26,7 @@ import org.safehaus.guicyfig.Key;
 
 import java.io.Serializable;
 
+
 @FigSingleton
 public interface ActorSystemFig extends GuicyFig, Serializable {
 
@@ -39,13 +40,10 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
 
     String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
 
-    String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
-
-    String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
+    String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region";
 
-    String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+    String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node";
 
-    String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region";
 
     /**
      * Use Akka or nah
@@ -73,13 +71,6 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
     String getRegionList();
 
     /**
-     * Number of UniqueValueActors to be started on each node
-     */
-    @Key(AKKA_UNIQUEVALUE_ACTORS)
-    @Default("300")
-    int getUniqueValueActors();
-
-    /**
      * Comma-separated lists of seeds each with format {region}:{hostname}:{port}.
      * Regions MUST be listed in the 'usergrid.queue.regionList'
      */
@@ -92,17 +83,11 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
     @Key(AKKA_AUTHORITATIVE_REGION)
     String getAkkaAuthoritativeRegion();
 
-    /**
-     * Unique Value cache TTL in seconds.
-     */
-    @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
-    @Default("10")
-    int getUniqueValueCacheTtl();
 
     /**
-     * Unique Value Reservation TTL in seconds.
+     * Number of actor instances to create on each node for each router.
      */
-    @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
-    @Default("10")
-    int getUniqueValueReservationTtl();
+    @Key(AKKA_INSTANCES_PER_NODE)
+    @Default("300")
+    int getInstancesPerNode();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index b3af978..1f7bf70 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -223,7 +223,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                 }
             }
 
-            int numInstancesPerNode = actorSystemFig.getUniqueValueActors();
+            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
 
             // read config file once for each region
 
@@ -236,15 +236,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                 // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions
                 String clusterRole = currentRegion.equals( region ) ? "io" : "client";
 
-                logger.info( "Config for region {} is:\n" +
-                        "   Akka Hostname {}\n" +
-                        "   Akka Seeds {}\n" +
-                        "   Akka UniqueValueActors per node {}\n" +
-                        "   Akka Authoritative Region {}",
-                    region, hostname, seeds, port, numInstancesPerNode, actorSystemFig.getAkkaAuthoritativeRegion() );
+                logger.info( "Akka Config for region {} is:\n" +
+                        "   Hostname {}\n" +
+                        "   Seeds {}\n" +
+                        "   Authoritative Region {}\n",
+                    region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
 
                 Map<String, Object> configMap = new HashMap<String, Object>() {{
+
                     put( "akka", new HashMap<String, Object>() {{
+
                         put( "remote", new HashMap<String, Object>() {{
                             put( "netty.tcp", new HashMap<String, Object>() {{
                                 put( "hostname", hostname );
@@ -252,8 +253,9 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                                 put( "port", regionPort );
                             }} );
                         }} );
+
                         put( "cluster", new HashMap<String, Object>() {{
-                            put( "max-nr-of-instances-per-node", numInstancesPerNode );
+                            put( "max-nr-of-instances-per-node", 300);
                             put( "roles", Collections.singletonList(clusterRole) );
                             put( "seed-nodes", new ArrayList<String>() {{
                                 for (String seed : seeds) {
@@ -262,24 +264,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                             }} );
                         }} );
 
-                        // TODO: allow RouterProducers to add in router-specific stuff like this:
-                        put( "actor", new HashMap<String, Object>() {{
-                            put( "deployment", new HashMap<String, Object>() {{
-                                put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{
-                                    put( "cluster", new HashMap<String, Object>() {{
-                                        //put( "roles", Collections.singletonList(role) );
-                                        put( "max-nr-of-instances-per-node", numInstancesPerNode );
-                                    }} );
-                                }} );
-                            }} );
-                        }} );
                     }} );
                 }};
 
-                Config config = ConfigFactory
-                    .parseMap( configMap )
+                for ( RouterProducer routerProducer : routerProducers ) {
+                    routerProducer.addConfiguration( configMap );
+                }
+
+                Config config = ConfigFactory.parseMap( configMap )
                     .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) )
-                    .withFallback( ConfigFactory.load( "cluster-singleton" ) );
+                    .withFallback( ConfigFactory.load( "application.conf" ) );
 
                 configs.put( region, config );
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
index ac2c7ee..3aa91cf 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
@@ -19,7 +19,6 @@
 package org.apache.usergrid.persistence.actorsystem;
 
 import akka.actor.ActorSystem;
-
 import java.util.Map;
 
 
@@ -42,4 +41,9 @@ public interface RouterProducer {
      */
     void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap );
 
+    /**
+     * Add configuration for the router to configuration map
+     */
+    void addConfiguration( Map<String, Object> configMap );
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf
index 93854f9..a243163 100644
--- a/stack/corepersistence/actorsystem/src/main/resources/application.conf
+++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf
@@ -1,13 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 akka {
-  
+
   loggers = ["akka.event.slf4j.Slf4jLogger"]
   loglevel = "ERROR"
   logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-  
+
   actor {
     provider = "akka.cluster.ClusterActorRefProvider"
   }
-  
+
   remote {
     log-remote-lifecycle-events = off
     netty.tcp {
@@ -24,5 +41,5 @@ akka.cluster.metrics.enabled=off
 akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"]
 
 # Sigar native library extract location during tests.
-# Note: use per-jvm-instance folder when running multiple jvm on one host. 
+# Note: use per-jvm-instance folder when running multiple jvm on one host.
 akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index ae73e47..daf3fdc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -59,6 +59,7 @@ public abstract class CollectionModule extends AbstractModule {
         // noinspection unchecked
         install( new GuicyFigModule( SerializationFig.class ) );
         install( new GuicyFigModule( CollectionSchedulerFig.class ) );
+        install( new GuicyFigModule( UniqueValuesFig.class ) );
         install( new SerializationModule() );
         install( new ServiceModule() );
         install( new ActorSystemModule() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
new file mode 100644
index 0000000..c99824f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+import java.io.Serializable;
+
+
+@FigSingleton
+public interface UniqueValuesFig extends GuicyFig, Serializable {
+
+    String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
+
+    String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
+
+    String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+
+    String AKKA_UNIQUEVALUE_INSTANCES_PER_NODE = "collection.akka.uniquevalue.instances-per-node";
+
+
+    /**
+     * Number of UniqueValueActors to be started on each node
+     */
+    @Key(AKKA_UNIQUEVALUE_ACTORS)
+    @Default("300")
+    int getUniqueValueActors();
+
+    /**
+     * Unique Value cache TTL in seconds.
+     */
+    @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
+    @Default("10")
+    int getUniqueValueCacheTtl();
+
+    /**
+     * Unique Value Reservation TTL in seconds.
+     */
+    @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
+    @Default("10")
+    int getUniqueValueReservationTtl();
+
+    /**
+     * Number of actor instances to create on each.
+     */
+    @Key(AKKA_UNIQUEVALUE_INSTANCES_PER_NODE)
+    @Default("300")
+    int getUniqueValueInstancesPerNode();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 85b9d1a..6035e04 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -52,7 +53,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class );
 
     static Injector          injector;
-    ActorSystemFig           actorSystemFig;
+    UniqueValuesFig          uniqueValuesFig;
     ActorSystemManager       actorSystemManager;
     UniqueValuesTable        table;
     private ReservationCache reservationCache;
@@ -61,16 +62,16 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     @Inject
     public UniqueValuesServiceImpl(
         Injector inj,
-        ActorSystemFig actorSystemFig,
+        UniqueValuesFig uniqueValuesFig,
         ActorSystemManager actorSystemManager,
         UniqueValuesTable table ) {
 
         injector = inj;
         this.actorSystemManager = actorSystemManager;
-        this.actorSystemFig = actorSystemFig;
+        this.uniqueValuesFig = uniqueValuesFig;
         this.table = table;
 
-        ReservationCache.init( actorSystemFig.getUniqueValueCacheTtl() );
+        ReservationCache.init( uniqueValuesFig.getUniqueValueCacheTtl() );
         this.reservationCache = ReservationCache.getInstance();
     }
 
@@ -300,4 +301,37 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     public void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
         subscribeToReservations( localSystem, systemMap );
     }
+
+    @Override
+    public void addConfiguration(Map<String, Object> configMap) {
+
+        int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode();
+
+        // TODO: will the below overwrite things other routers have added under "actor.deployment"?
+
+        Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
+
+        akka.put( "actor", new HashMap<String, Object>() {{
+            put( "deployment", new HashMap<String, Object>() {{
+                put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{
+                    put( "router", "consistent-hashing-pool" );
+                    put( "cluster", new HashMap<String, Object>() {{
+                        put( "enabled", "on" );
+                        put( "allow-local-routees", "on" );
+                        put( "user-role", "io" );
+                        put( "max-nr-of-instances-per-node", numInstancesPerNode );
+                        put( "failure-detector", new HashMap<String, Object>() {{
+                            put( "threshold", "" );
+                            put( "acceptable-heartbeat-pause", "3 s" );
+                            put( "heartbeat-interval", "1 s" );
+                            put( "heartbeat-request", new HashMap<String, Object>() {{
+                                put( "expected-response-after", "3 s" );
+                            }} );
+                        }} );
+                    }} );
+                }} );
+            }} );
+        }} );
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
index de326dd..9cb13be 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@ -42,12 +42,12 @@ public class UniqueValuesTableImpl implements UniqueValuesTable {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
 
     final UniqueValueSerializationStrategy strat;
-    final ActorSystemFig actorSystemFig;
+    final UniqueValuesFig uniqueValuesFig;
 
     @Inject
-    public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, ActorSystemFig actorSystemFig) {
+    public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) {
         this.strat = strat;
-        this.actorSystemFig = actorSystemFig;
+        this.uniqueValuesFig = uniqueValuesFig;
     }
 
 
@@ -63,7 +63,7 @@ public class UniqueValuesTableImpl implements UniqueValuesTable {
     public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException {
 
         UniqueValue uv = new UniqueValueImpl( field, owner, version);
-        final MutationBatch write = strat.write( scope, uv, actorSystemFig.getUniqueValueReservationTtl() );
+        final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
         write.execute();
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/test/resources/usergrid.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties
index f20dfe8..759a3b3 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties
@@ -7,7 +7,7 @@ collection.akka.hostname=localhost
 collection.akka.port=2551
 collection.akka.region=us-east
 usergrid.queue.regionList=us-east
-collection.akka.uniquevalue.authoritative.region=us-east
+collection.akka.authoritative.region=us-east
 collection.akka.region.seeds=us-east\:localhost\:2551
 
 collection.akka.uniquevalue.actors=300