You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/06/22 15:25:52 UTC
[6/8] usergrid git commit: Move cluster-singleton router config into
RouterProducer and out of actorsystem module.
Move cluster-singleton router config into RouterProducer and out of actorsystem module.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fb1d78d0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fb1d78d0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fb1d78d0
Branch: refs/heads/usergrid-1268-akka-211
Commit: fb1d78d0469c33067ba415cafa44540c5185dd61
Parents: 343ac51
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Jun 22 10:33:30 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Jun 22 10:33:30 2016 -0400
----------------------------------------------------------------------
.../persistence/actorsystem/ActorSystemFig.java | 29 ++-------
.../actorsystem/ActorSystemManagerImpl.java | 38 +++++------
.../persistence/actorsystem/RouterProducer.java | 6 +-
.../src/main/resources/application.conf | 25 ++++++--
.../collection/guice/CollectionModule.java | 1 +
.../uniquevalues/UniqueValuesFig.java | 67 ++++++++++++++++++++
.../uniquevalues/UniqueValuesServiceImpl.java | 42 ++++++++++--
.../uniquevalues/UniqueValuesTableImpl.java | 8 +--
.../src/test/resources/usergrid.properties | 2 +-
9 files changed, 160 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
index 50e860b..ec010d0 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
@@ -26,6 +26,7 @@ import org.safehaus.guicyfig.Key;
import java.io.Serializable;
+
@FigSingleton
public interface ActorSystemFig extends GuicyFig, Serializable {
@@ -39,13 +40,10 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
- String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
-
- String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
+ String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region";
- String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+ String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node";
- String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region";
/**
* Use Akka or nah
@@ -73,13 +71,6 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
String getRegionList();
/**
- * Number of UniqueValueActors to be started on each node
- */
- @Key(AKKA_UNIQUEVALUE_ACTORS)
- @Default("300")
- int getUniqueValueActors();
-
- /**
* Comma-separated lists of seeds each with format {region}:{hostname}:{port}.
* Regions MUST be listed in the 'usergrid.queue.regionList'
*/
@@ -92,17 +83,11 @@ public interface ActorSystemFig extends GuicyFig, Serializable {
@Key(AKKA_AUTHORITATIVE_REGION)
String getAkkaAuthoritativeRegion();
- /**
- * Unique Value cache TTL in seconds.
- */
- @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
- @Default("10")
- int getUniqueValueCacheTtl();
/**
- * Unique Value Reservation TTL in seconds.
+ * Number of actor instances to create on each node for each router.
*/
- @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
- @Default("10")
- int getUniqueValueReservationTtl();
+ @Key(AKKA_INSTANCES_PER_NODE)
+ @Default("300")
+ int getInstancesPerNode();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index b3af978..1f7bf70 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -223,7 +223,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
}
}
- int numInstancesPerNode = actorSystemFig.getUniqueValueActors();
+ int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
// read config file once for each region
@@ -236,15 +236,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
// cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions
String clusterRole = currentRegion.equals( region ) ? "io" : "client";
- logger.info( "Config for region {} is:\n" +
- " Akka Hostname {}\n" +
- " Akka Seeds {}\n" +
- " Akka UniqueValueActors per node {}\n" +
- " Akka Authoritative Region {}",
- region, hostname, seeds, port, numInstancesPerNode, actorSystemFig.getAkkaAuthoritativeRegion() );
+ logger.info( "Akka Config for region {} is:\n" +
+ " Hostname {}\n" +
+ " Seeds {}\n" +
+ " Authoritative Region {}\n",
+ region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
Map<String, Object> configMap = new HashMap<String, Object>() {{
+
put( "akka", new HashMap<String, Object>() {{
+
put( "remote", new HashMap<String, Object>() {{
put( "netty.tcp", new HashMap<String, Object>() {{
put( "hostname", hostname );
@@ -252,8 +253,9 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
put( "port", regionPort );
}} );
}} );
+
put( "cluster", new HashMap<String, Object>() {{
- put( "max-nr-of-instances-per-node", numInstancesPerNode );
+ put( "max-nr-of-instances-per-node", 300);
put( "roles", Collections.singletonList(clusterRole) );
put( "seed-nodes", new ArrayList<String>() {{
for (String seed : seeds) {
@@ -262,24 +264,16 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
}} );
}} );
- // TODO: allow RouterProducers to add in router-specific stuff like this:
- put( "actor", new HashMap<String, Object>() {{
- put( "deployment", new HashMap<String, Object>() {{
- put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{
- put( "cluster", new HashMap<String, Object>() {{
- //put( "roles", Collections.singletonList(role) );
- put( "max-nr-of-instances-per-node", numInstancesPerNode );
- }} );
- }} );
- }} );
- }} );
}} );
}};
- Config config = ConfigFactory
- .parseMap( configMap )
+ for ( RouterProducer routerProducer : routerProducers ) {
+ routerProducer.addConfiguration( configMap );
+ }
+
+ Config config = ConfigFactory.parseMap( configMap )
.withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) )
- .withFallback( ConfigFactory.load( "cluster-singleton" ) );
+ .withFallback( ConfigFactory.load( "application.conf" ) );
configs.put( region, config );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
index ac2c7ee..3aa91cf 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
@@ -19,7 +19,6 @@
package org.apache.usergrid.persistence.actorsystem;
import akka.actor.ActorSystem;
-
import java.util.Map;
@@ -42,4 +41,9 @@ public interface RouterProducer {
*/
void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap );
+ /**
+ * Add configuration for the router to configuration map
+ */
+ void addConfiguration( Map<String, Object> configMap );
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/actorsystem/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf
index 93854f9..a243163 100644
--- a/stack/corepersistence/actorsystem/src/main/resources/application.conf
+++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf
@@ -1,13 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
akka {
-
+
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "ERROR"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-
+
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
-
+
remote {
log-remote-lifecycle-events = off
netty.tcp {
@@ -24,5 +41,5 @@ akka.cluster.metrics.enabled=off
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"]
# Sigar native library extract location during tests.
-# Note: use per-jvm-instance folder when running multiple jvm on one host.
+# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index ae73e47..daf3fdc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -59,6 +59,7 @@ public abstract class CollectionModule extends AbstractModule {
// noinspection unchecked
install( new GuicyFigModule( SerializationFig.class ) );
install( new GuicyFigModule( CollectionSchedulerFig.class ) );
+ install( new GuicyFigModule( UniqueValuesFig.class ) );
install( new SerializationModule() );
install( new ServiceModule() );
install( new ActorSystemModule() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
new file mode 100644
index 0000000..c99824f
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.uniquevalues;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+import java.io.Serializable;
+
+
+@FigSingleton
+public interface UniqueValuesFig extends GuicyFig, Serializable {
+
+ String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
+
+ String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
+
+ String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+
+ String AKKA_UNIQUEVALUE_INSTANCES_PER_NODE = "collection.akka.uniquevalue.instances-per-node";
+
+
+ /**
+ * Number of UniqueValueActors to be started on each node
+ */
+ @Key(AKKA_UNIQUEVALUE_ACTORS)
+ @Default("300")
+ int getUniqueValueActors();
+
+ /**
+ * Unique Value cache TTL in seconds.
+ */
+ @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
+ @Default("10")
+ int getUniqueValueCacheTtl();
+
+ /**
+ * Unique Value Reservation TTL in seconds.
+ */
+ @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
+ @Default("10")
+ int getUniqueValueReservationTtl();
+
+ /**
+ * Number of actor instances to create on each.
+ */
+ @Key(AKKA_UNIQUEVALUE_INSTANCES_PER_NODE)
+ @Default("300")
+ int getUniqueValueInstancesPerNode();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 85b9d1a..6035e04 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -52,7 +53,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class );
static Injector injector;
- ActorSystemFig actorSystemFig;
+ UniqueValuesFig uniqueValuesFig;
ActorSystemManager actorSystemManager;
UniqueValuesTable table;
private ReservationCache reservationCache;
@@ -61,16 +62,16 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
@Inject
public UniqueValuesServiceImpl(
Injector inj,
- ActorSystemFig actorSystemFig,
+ UniqueValuesFig uniqueValuesFig,
ActorSystemManager actorSystemManager,
UniqueValuesTable table ) {
injector = inj;
this.actorSystemManager = actorSystemManager;
- this.actorSystemFig = actorSystemFig;
+ this.uniqueValuesFig = uniqueValuesFig;
this.table = table;
- ReservationCache.init( actorSystemFig.getUniqueValueCacheTtl() );
+ ReservationCache.init( uniqueValuesFig.getUniqueValueCacheTtl() );
this.reservationCache = ReservationCache.getInstance();
}
@@ -300,4 +301,37 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
public void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
subscribeToReservations( localSystem, systemMap );
}
+
+ @Override
+ public void addConfiguration(Map<String, Object> configMap) {
+
+ int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode();
+
+ // TODO: will the below overwrite things other routers have added under "actor.deployment"?
+
+ Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
+
+ akka.put( "actor", new HashMap<String, Object>() {{
+ put( "deployment", new HashMap<String, Object>() {{
+ put( "/uvRouter/singleton/router", new HashMap<String, Object>() {{
+ put( "router", "consistent-hashing-pool" );
+ put( "cluster", new HashMap<String, Object>() {{
+ put( "enabled", "on" );
+ put( "allow-local-routees", "on" );
+ put( "user-role", "io" );
+ put( "max-nr-of-instances-per-node", numInstancesPerNode );
+ put( "failure-detector", new HashMap<String, Object>() {{
+ put( "threshold", "" );
+ put( "acceptable-heartbeat-pause", "3 s" );
+ put( "heartbeat-interval", "1 s" );
+ put( "heartbeat-request", new HashMap<String, Object>() {{
+ put( "expected-response-after", "3 s" );
+ }} );
+ }} );
+ }} );
+ }} );
+ }} );
+ }} );
+
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
index de326dd..9cb13be 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@ -42,12 +42,12 @@ public class UniqueValuesTableImpl implements UniqueValuesTable {
private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
final UniqueValueSerializationStrategy strat;
- final ActorSystemFig actorSystemFig;
+ final UniqueValuesFig uniqueValuesFig;
@Inject
- public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, ActorSystemFig actorSystemFig) {
+ public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, UniqueValuesFig uniqueValuesFig) {
this.strat = strat;
- this.actorSystemFig = actorSystemFig;
+ this.uniqueValuesFig = uniqueValuesFig;
}
@@ -63,7 +63,7 @@ public class UniqueValuesTableImpl implements UniqueValuesTable {
public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException {
UniqueValue uv = new UniqueValueImpl( field, owner, version);
- final MutationBatch write = strat.write( scope, uv, actorSystemFig.getUniqueValueReservationTtl() );
+ final MutationBatch write = strat.write( scope, uv, uniqueValuesFig.getUniqueValueReservationTtl() );
write.execute();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fb1d78d0/stack/corepersistence/collection/src/test/resources/usergrid.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties
index f20dfe8..759a3b3 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties
@@ -7,7 +7,7 @@ collection.akka.hostname=localhost
collection.akka.port=2551
collection.akka.region=us-east
usergrid.queue.regionList=us-east
-collection.akka.uniquevalue.authoritative.region=us-east
+collection.akka.authoritative.region=us-east
collection.akka.region.seeds=us-east\:localhost\:2551
collection.akka.uniquevalue.actors=300