You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/11 15:35:38 UTC

[28/50] [abbrv] usergrid git commit: Use ClusterClient feature instead of roles to ensure that all write are done only in the appropriate authoritative region.

Use ClusterClient feature instead of roles to ensure that all write are done only in the appropriate authoritative region.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d12307bd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d12307bd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d12307bd

Branch: refs/heads/release-2.1.1
Commit: d12307bdb3219ac87550147cb23cbb0e14155200
Parents: 841409f
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Jun 23 17:33:33 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Jun 23 17:33:33 2016 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../actorsystem/ActorSystemManager.java         |   8 +-
 .../actorsystem/ActorSystemManagerImpl.java     | 297 ++++++++++---------
 .../persistence/actorsystem/RouterProducer.java |   8 +-
 .../src/main/resources/application.conf         |   7 +-
 .../src/main/resources/cluster-singleton.conf   |  25 --
 .../uniquevalues/UniqueValueActor.java          |   2 +-
 .../uniquevalues/UniqueValuesServiceImpl.java   |  95 +++---
 8 files changed, 223 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 9bd589a..060ec18 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -155,7 +155,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class, "/user/uvProxy" );
                 actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class, "/user/uvProxy" );
                 actorSystemManager.start();
-                actorSystemManager.waitForRequestActors();
+                actorSystemManager.waitForClientActors();
 
             } catch (Throwable t) {
                 logger.error("Error starting Akka", t);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index e2c2913..c45ccac 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -27,7 +27,7 @@ public interface ActorSystemManager {
 
     void start(String hostname, Integer port, String currentRegion);
 
-    void waitForRequestActors();
+    void waitForClientActors();
 
     boolean isReady();
 
@@ -35,5 +35,9 @@ public interface ActorSystemManager {
 
     void registerMessageType( Class messageType, String routerPath );
 
-    ActorRef getClientActor(String region );
+    ActorRef getClientActor();
+
+    ActorRef getClusterClient(String region );
+
+    String getCurrentRegion();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 1f7bf70..5e23c20 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -19,15 +19,15 @@
 package org.apache.usergrid.persistence.actorsystem;
 
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
+import akka.actor.*;
+import akka.cluster.client.ClusterClient;
+import akka.cluster.client.ClusterClientReceptionist;
+import akka.cluster.client.ClusterClientSettings;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -45,22 +45,25 @@ import java.util.concurrent.TimeUnit;
 public class ActorSystemManagerImpl implements ActorSystemManager {
     private static final Logger logger = LoggerFactory.getLogger( ActorSystemManagerImpl.class );
 
+    private boolean started = false;
+
     private String  hostname;
     private Integer port;
     private String  currentRegion;
 
-    private static Injector             injector;
     private final ActorSystemFig        actorSystemFig;
-    private final Map<String, ActorRef> requestActorsByRegion;
     private final List<RouterProducer>  routerProducers = new ArrayList<>();
     private final Map<Class, String>    routersByMessageType = new HashMap<>();
+    private final Map<String, ActorRef> clusterClientsByRegion = new HashMap<String, ActorRef>(20);
+
+    private ActorRef                    clientActor;
+
+    private ListMultimap<String, String> seedsByRegion;
 
 
     @Inject
-    public ActorSystemManagerImpl(Injector inj, ActorSystemFig actorSystemFig) {
-        injector = inj;
+    public ActorSystemManagerImpl( ActorSystemFig actorSystemFig ) {
         this.actorSystemFig = actorSystemFig;
-        this.requestActorsByRegion = new HashMap<>();
     }
 
 
@@ -75,7 +78,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
         this.port = null;
 
         initAkka();
-        waitForRequestActors();
+        waitForClientActors();
     }
 
 
@@ -95,7 +98,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
     @Override
     public boolean isReady() {
-        return !getRequestActorsByRegion().isEmpty();
+        return started;
     }
 
 
@@ -112,13 +115,20 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
 
     @Override
-    public ActorRef getClientActor(String region) {
-        return getRequestActorsByRegion().get( region );
+    public ActorRef getClientActor() {
+        return clientActor;
+    }
+
+
+    @Override
+    public ActorRef getClusterClient(String region) {
+        return clusterClientsByRegion.get( region );
     }
 
 
-    private Map<String, ActorRef> getRequestActorsByRegion() {
-        return requestActorsByRegion;
+    @Override
+    public String getCurrentRegion() {
+        return currentRegion;
     }
 
 
@@ -152,215 +162,214 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
         logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}",
             hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() );
 
-        final Map<String, ActorSystem> systemMap = new HashMap<>();
-
-        Map<String, Config> configMap = readClusterSingletonConfigs();
+        Config config = readClusterSystemConfig();
 
-        ActorSystem localSystem = createClusterSingletonProxies( configMap, systemMap );
+        ActorSystem localSystem = createClusterSystemsFromConfigs( config );
 
-        createRequestActors( systemMap );
+        createClientActors( localSystem );
 
         for ( RouterProducer routerProducer : routerProducers ) {
-            routerProducer.createLocalSystemActors( localSystem, systemMap );
+            routerProducer.createLocalSystemActors( localSystem );
         }
     }
 
 
     /**
-     * Read configuration and create a Config for each region.
-     *
-     * @return Map of regions to Configs.
+     * Read Usergrid's list of seeds, put them in handy multi-map.
      */
-    private Map<String, Config> readClusterSingletonConfigs() {
+    private ListMultimap<String, String> getSeedsByRegion() {
 
-        Map<String, Config> configs = new HashMap<>();
+        if ( seedsByRegion == null ) {
 
-        ListMultimap<String, String> seedsByRegion = ArrayListMultimap.create();
+            seedsByRegion = ArrayListMultimap.create();
 
-        String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," );
+            String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," );
 
-        logger.info("Found region {} seeds {}", regionSeeds.length, regionSeeds);
+            logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds );
 
-        try {
+            try {
 
-            if ( port != null ) {
+                if (port != null) {
 
-                // we are testing, create seeds-by-region map for one region, one seed
+                    // we are testing, create seeds-by-region map for one region, one seed
 
-                String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + port;
-                seedsByRegion.put( currentRegion, seed );
-                logger.info("Akka testing, only starting one seed");
+                    String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + port;
+                    seedsByRegion.put( currentRegion, seed );
+                    logger.info( "Akka testing, only starting one seed" );
 
-            } else { // create seeds-by-region map
+                } else { // create seeds-by-region map
 
-                for (String regionSeed : regionSeeds) {
+                    for (String regionSeed : regionSeeds) {
 
-                    String[] parts = regionSeed.split( ":" );
-                    String region = parts[0];
-                    String hostname = parts[1];
-                    String regionPortString = parts[2];
+                        String[] parts = regionSeed.split( ":" );
+                        String region = parts[0];
+                        String hostname = parts[1];
+                        String regionPortString = parts[2];
 
-                    // all seeds in same region must use same port
-                    // we assume 0th seed has the right port
-                    final Integer regionPort;
+                        // all seeds in same region must use same port
+                        // we assume 0th seed has the right port
+                        final Integer regionPort;
 
-                    if (port == null) {
-                        regionPort = Integer.parseInt( regionPortString );
-                    } else {
-                        regionPort = port; // unless we are testing
-                    }
+                        if (port == null) {
+                            regionPort = Integer.parseInt( regionPortString );
+                        } else {
+                            regionPort = port; // unless we are testing
+                        }
 
-                    String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + regionPort;
+                        String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + regionPort;
 
-                    logger.info("Adding seed {} for region {}", seed, region );
+                        logger.info( "Adding seed {} for region {}", seed, region );
 
-                    seedsByRegion.put( region, seed );
-                }
+                        seedsByRegion.put( region, seed );
+                    }
 
-                if (seedsByRegion.keySet().isEmpty()) {
-                    throw new RuntimeException(
-                        "No seeds listed in 'parsing collection.akka.region.seeds' property." );
+                    if (seedsByRegion.keySet().isEmpty()) {
+                        throw new RuntimeException(
+                            "No seeds listed in 'parsing collection.akka.region.seeds' property." );
+                    }
                 }
+
+            } catch (Exception e) {
+                throw new RuntimeException( "Error 'parsing collection.akka.region.seeds' property", e );
             }
+        }
+
+        return seedsByRegion;
+    }
 
-            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
 
-            // read config file once for each region
+    /**
+     * Read cluster config and add seed nodes to it.
+     */
+    private Config readClusterSystemConfig() {
 
-            for ( String region : seedsByRegion.keySet() ) {
+        Config config = null;
 
-                List<String> seeds = seedsByRegion.get( region );
-                int lastColon = seeds.get(0).lastIndexOf(":") + 1;
-                final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon ));
+        try {
 
-                // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions
-                String clusterRole = currentRegion.equals( region ) ? "io" : "client";
+            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
 
-                logger.info( "Akka Config for region {} is:\n" +
-                        "   Hostname {}\n" +
-                        "   Seeds {}\n" +
-                        "   Authoritative Region {}\n",
-                    region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
+            String region = currentRegion;
 
-                Map<String, Object> configMap = new HashMap<String, Object>() {{
+            List<String> seeds = getSeedsByRegion().get( region );
+            int lastColon = seeds.get(0).lastIndexOf(":") + 1;
+            final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon ));
 
-                    put( "akka", new HashMap<String, Object>() {{
+            logger.info( "Akka Config for region {} is:\n" +
+                    "   Hostname {}\n" +
+                    "   Seeds {}\n" +
+                    "   Authoritative Region {}\n",
+                region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
 
-                        put( "remote", new HashMap<String, Object>() {{
-                            put( "netty.tcp", new HashMap<String, Object>() {{
-                                put( "hostname", hostname );
-                                put( "bind-hostname", hostname );
-                                put( "port", regionPort );
-                            }} );
-                        }} );
+            Map<String, Object> configMap = new HashMap<String, Object>() {{
 
-                        put( "cluster", new HashMap<String, Object>() {{
-                            put( "max-nr-of-instances-per-node", 300);
-                            put( "roles", Collections.singletonList(clusterRole) );
-                            put( "seed-nodes", new ArrayList<String>() {{
-                                for (String seed : seeds) {
-                                    add( seed );
-                                }
-                            }} );
-                        }} );
+                put( "akka", new HashMap<String, Object>() {{
 
+                    put( "remote", new HashMap<String, Object>() {{
+                        put( "netty.tcp", new HashMap<String, Object>() {{
+                            put( "hostname", hostname );
+                            put( "bind-hostname", hostname );
+                            put( "port", regionPort );
+                        }} );
                     }} );
-                }};
 
-                for ( RouterProducer routerProducer : routerProducers ) {
-                    routerProducer.addConfiguration( configMap );
-                }
+                    put( "cluster", new HashMap<String, Object>() {{
+                        put( "max-nr-of-instances-per-node", numInstancesPerNode);
+                        put( "roles", Collections.singletonList("io") );
+                        put( "seed-nodes", new ArrayList<String>() {{
+                            for (String seed : seeds) {
+                                add( seed );
+                            }
+                        }} );
+                    }} );
 
-                Config config = ConfigFactory.parseMap( configMap )
-                    .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]" ) )
-                    .withFallback( ConfigFactory.load( "application.conf" ) );
+                }} );
+            }};
 
-                configs.put( region, config );
+            for ( RouterProducer routerProducer : routerProducers ) {
+                routerProducer.addConfiguration( configMap );
             }
 
+            config = ConfigFactory.parseMap( configMap )
+                .withFallback( ConfigFactory.load( "application.conf" ) );
+
+
         } catch ( Exception e ) {
-            throw new RuntimeException("Error 'parsing collection.akka.region.seeds' property", e );
+            throw new RuntimeException("Error reading and adding to cluster config", e );
         }
 
-        return configs;
+        return config;
     }
 
 
     /**
-     * Create ActorSystem and ClusterSingletonProxy for every region.
-     * Create ClusterSingletonManager for the current region.
-     *
-     * @param configMap Configurations to be used to create ActorSystems
-     * @param systemMap Map of ActorSystems created by this method
-     *
-     * @return ActorSystem for this region.
+     * Create actor system for this region, with cluster singleton manager & proxy.
      */
-    private ActorSystem createClusterSingletonProxies(
-        Map<String, Config> configMap, Map<String, ActorSystem> systemMap ) {
-
-        ActorSystem localSystem = null;
+    private ActorSystem createClusterSystemsFromConfigs( Config config ) {
 
-        for ( String region : configMap.keySet() ) {
-            Config config = configMap.get( region );
+        ActorSystem system = ActorSystem.create( "ClusterSystem", config );
 
-            ActorSystem system = ActorSystem.create( "ClusterSystem", config );
-            systemMap.put( region, system );
-
-            // cluster singletons only run role "io" nodes and NOT on "client" nodes of other regions
-            if ( currentRegion.equals( region ) ) {
-
-                localSystem = system;
-
-                for ( RouterProducer routerProducer : routerProducers ) {
-                    routerProducer.createClusterSingletonManager( system );
-                }
-            }
+        for ( RouterProducer routerProducer : routerProducers ) {
+            logger.info("Creating {} for region {}", routerProducer.getName(), currentRegion );
+            routerProducer.createClusterSingletonManager( system );
+        }
 
-            for ( RouterProducer routerProducer : routerProducers ) {
-                routerProducer.createClusterSingletonProxy( system );
-            }
+        for ( RouterProducer routerProducer : routerProducers ) {
+            logger.info("Creating {} proxy for region {} role 'io'", routerProducer.getName(), currentRegion);
+            routerProducer.createClusterSingletonProxy( system, "io" );
         }
 
-        return localSystem;
+        return system;
     }
 
 
     /**
      * Create RequestActor for each region.
-     *
-     * @param systemMap Map of regions to ActorSystems.
      */
-    private void createRequestActors( Map<String, ActorSystem> systemMap ) {
+    private void createClientActors( ActorSystem system ) {
+
+        for ( String region : getSeedsByRegion().keySet() ) {
+
+            if ( currentRegion.equals( region )) {
+
+                logger.info( "Creating clientActor for region {}", region );
+
+                // Each clientActor needs to know path to ClusterSingletonProxy and region
+                clientActor = system.actorOf(
+                    Props.create( ClientActor.class, routersByMessageType ), "clientActor" );
 
-        for ( String region : systemMap.keySet() ) {
+                ClusterClientReceptionist.get(system).registerService( clientActor );
 
-            logger.info("Creating request actor for region {}", region);
+            } else {
 
-            // Each RequestActor needs to know path to ClusterSingletonProxy and region
-            ActorRef requestActor = systemMap.get( region ).actorOf(
-                //Props.create( ClientActor.class, "/user/uvProxy" ), "requestActor" );
-                Props.create( ClientActor.class, routersByMessageType ), "requestActor" );
+                List<String> regionSeeds = getSeedsByRegion().get( region );
+                Set<ActorPath> seedPaths = new HashSet<>(20);
+                for ( String seed : getSeedsByRegion().get( region ) ) {
+                    seedPaths.add( ActorPaths.fromString( seed + "/system/receptionist") );
+                }
+
+                ActorRef clusterClient = system.actorOf( ClusterClient.props(
+                    ClusterClientSettings.create(system).withInitialContacts( seedPaths )), "client");
+
+                clusterClientsByRegion.put( region, clusterClient );
+            }
 
-            requestActorsByRegion.put( region, requestActor );
         }
     }
 
 
     @Override
-    public void waitForRequestActors() {
+    public void waitForClientActors() {
 
-        for ( String region : requestActorsByRegion.keySet() ) {
-            ActorRef ra = requestActorsByRegion.get( region );
-            waitForRequestActor( ra );
-        }
+        waitForClientActor( clientActor );
     }
 
-
-    private void waitForRequestActor( ActorRef ra ) {
+    private void waitForClientActor( ActorRef ra ) {
 
         logger.info( "Waiting on request actor {}...", ra.path() );
 
-        boolean started = false;
+        started = false;
+
         int retries = 0;
         int maxRetries = 60;
         while (retries < maxRetries) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
index 3aa91cf..d849dd9 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 public interface RouterProducer {
 
+    String getName();
+
     /**
      * Create cluster single manager for current region.
      * Will be called once per router per JVM.
@@ -34,16 +36,16 @@ public interface RouterProducer {
      * Create cluster singleton proxy for region.
      * Will be called once per router per JVM per region.
      */
-    void createClusterSingletonProxy( ActorSystem system );
+    void createClusterSingletonProxy( ActorSystem system, String role );
 
     /**
      * Create other actors needed to support the router produced by the implementation.
      */
-    void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap );
+    void createLocalSystemActors( ActorSystem localSystem );
 
     /**
      * Add configuration for the router to configuration map
      */
-    void addConfiguration( Map<String, Object> configMap );
+    void addConfiguration(Map<String, Object> configMap );
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf
index a243163..5706610 100644
--- a/stack/corepersistence/actorsystem/src/main/resources/application.conf
+++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf
@@ -38,7 +38,12 @@ akka {
 akka.cluster.metrics.enabled=off
 
 # Enable metrics extension in akka-cluster-metrics.
-akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"]
+akka.extensions=[
+    "akka.cluster.metrics.ClusterMetricsExtension",
+    "akka.cluster.pubsub.DistributedPubSub",
+    "akka.cluster.client.ClusterClientReceptionist"
+]
+
 
 # Sigar native library extract location during tests.
 # Note: use per-jvm-instance folder when running multiple jvm on one host.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf b/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf
deleted file mode 100644
index 907aebb..0000000
--- a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-include "application"
-
-akka.actor.deployment {
-  /uvRouter/singleton/router {
-    router = consistent-hashing-pool
-    cluster {
-      enabled = on
-      allow-local-routees = on
-      
-      # singleton will only run on nodes with role "io"
-      use-role = io
-
-      # more forgiving failure detector
-      failure-detector {
-        threshold = 10
-        acceptable-heartbeat-pause = 3 s
-        heartbeat-interval = 1 s
-        heartbeat-request {
-          expected-response-after = 3 s
-        }
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index bb30b92..e53710c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -59,7 +59,7 @@ public class UniqueValueActor extends UntypedActor {
 
             count++;
             if (count % 10 == 0) {
-                logger.debug( "UniqueValueActor {} processed {} requests", name, count );
+                logger.info( "UniqueValueActor {} processed {} requests", name, count );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 119d6f6..b888b1f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.cluster.client.ClusterClient;
 import akka.cluster.singleton.ClusterSingletonManager;
 import akka.cluster.singleton.ClusterSingletonManagerSettings;
 import akka.cluster.singleton.ClusterSingletonProxy;
@@ -75,18 +76,25 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     }
 
 
-    private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
-
-        for ( String region : systemMap.keySet() ) {
-            ActorSystem actorSystem = systemMap.get( region );
-            if ( !actorSystem.equals( localSystem ) ) {
-                logger.info("Starting ReservationCacheUpdater for {}", region );
-                actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
-            }
-        }
+    @Override
+    public String getName() {
+        return "UniqueValues ClusterSingleton Router";
     }
 
 
+    // TODO: restore reservation cache
+//    private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
+//
+//        for ( String region : systemMap.keySet() ) {
+//            ActorSystem actorSystem = systemMap.get( region );
+//            if ( !actorSystem.equals( localSystem ) ) {
+//                logger.info("Starting ReservationCacheUpdater for {}", region );
+//                actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
+//            }
+//        }
+//    }
+
+
     @Override
     public void reserveUniqueValues(
         ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException {
@@ -154,12 +162,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     private void reserveUniqueField(
         ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException {
 
-        final ActorRef requestActor = actorSystemManager.getClientActor( region );
-
-        if ( requestActor == null ) {
-            throw new RuntimeException( "No request actor for region " + region);
-        }
-
         UniqueValueActor.Request request = new UniqueValueActor.Reservation(
             scope, entity.getId(), version, field );
 
@@ -171,39 +173,35 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             throw new UniqueValueException( "Error property not unique (cache)", field);
         }
 
-        sendUniqueValueRequest( entity, requestActor, request );
+        sendUniqueValueRequest( entity, region, request );
     }
 
 
     private void confirmUniqueField(
         ApplicationScope scope, Entity entity, UUID version, Field field, String region) throws UniqueValueException {
 
-        final ActorRef requestActor = actorSystemManager.getClientActor( region );
-
-        if ( requestActor == null ) {
-            throw new RuntimeException( "No request actor for type, cannot verify unique fields!" );
-        }
-
         UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation(
             scope, entity.getId(), version, field );
 
-        sendUniqueValueRequest( entity, requestActor, request );
+        sendUniqueValueRequest( entity, region, request );
     }
 
 
     private void cancelUniqueField(
         ApplicationScope scope, Entity entity, UUID version, Field field, String region ) throws UniqueValueException {
 
-        final ActorRef requestActor = actorSystemManager.getClientActor( region );
-
-        if ( requestActor == null ) {
-            throw new RuntimeException( "No request actor for type, cannot verify unique fields!" );
-        }
-
         UniqueValueActor.Cancellation request = new UniqueValueActor.Cancellation(
             scope, entity.getId(), version, field );
 
-        requestActor.tell( request, null );
+        if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+            ActorRef clientActor = actorSystemManager.getClientActor();
+            clientActor.tell( request, null );
+
+        } else {
+            ActorRef clusterClient = actorSystemManager.getClusterClient( region );
+            clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null );
+        }
+
     }
 
 
@@ -218,7 +216,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
 
     private void sendUniqueValueRequest(
-        Entity entity, ActorRef requestActor, UniqueValueActor.Request request ) throws UniqueValueException {
+        Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException {
 
         int maxRetries = 5;
         int retries = 0;
@@ -230,7 +228,17 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
                 // ask RequestActor and wait (up to timeout) for response
 
-                Future<Object> fut = Patterns.ask( requestActor, request, t );
+                Future<Object> fut;
+
+                if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+                    ActorRef clientActor = actorSystemManager.getClientActor();
+                    fut = Patterns.ask( clientActor, request, t );
+
+                } else {
+                    ActorRef clusterClient = actorSystemManager.getClusterClient( region );
+                    fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor", request), t );
+                }
+
                 response = (UniqueValueActor.Response) Await.result( fut, t.duration() );
 
                 if ( response != null && (
@@ -280,34 +288,33 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             ClusterSingletonManagerSettings.create( system ).withRole("io");
 
         system.actorOf( ClusterSingletonManager.props(
-            //Props.create( ClusterSingletonRouter.class, table ),
-            Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class),
-            PoisonPill.getInstance(), settings ), "uvRouter");
+            Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ),
+            PoisonPill.getInstance(), settings ), "uvRouter" );
+
     }
 
 
     @Override
-    public void createClusterSingletonProxy(ActorSystem system) {
+    public void createClusterSingletonProxy( ActorSystem system, String role ) {
 
         ClusterSingletonProxySettings proxySettings =
-            ClusterSingletonProxySettings.create( system ).withRole("io");
+            ClusterSingletonProxySettings.create( system ).withRole( role );
 
         system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy" );
     }
 
 
     @Override
-    public void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
-        subscribeToReservations( localSystem, systemMap );
+    public void createLocalSystemActors( ActorSystem localSystem ) {
+        // TODO: restore reservation cache
+        //subscribeToReservations( localSystem );
     }
 
     @Override
-    public void addConfiguration(Map<String, Object> configMap) {
+    public void addConfiguration( Map<String, Object> configMap ) {
 
         int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode();
 
-        // TODO: will the below overwrite things other routers have added under "actor.deployment"?
-
         Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
 
         akka.put( "actor", new HashMap<String, Object>() {{
@@ -317,10 +324,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
                     put( "cluster", new HashMap<String, Object>() {{
                         put( "enabled", "on" );
                         put( "allow-local-routees", "on" );
-                        put( "user-role", "io" );
+                        put( "use-role", "io" );
                         put( "max-nr-of-instances-per-node", numInstancesPerNode );
                         put( "failure-detector", new HashMap<String, Object>() {{
-                            put( "threshold", "" );
+                            put( "threshold", "10" );
                             put( "acceptable-heartbeat-pause", "3 s" );
                             put( "heartbeat-interval", "1 s" );
                             put( "heartbeat-request", new HashMap<String, Object>() {{