You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/26 18:12:39 UTC

[12/50] [abbrv] usergrid git commit: Simplify and rename Actorsystem configuration properties to be more generic, e.g. starting with "usergrid.cluster" instead of "collection.akka"

Simplify and rename Actorsystem configuration properties to be more generic, e.g. starting with "usergrid.cluster" instead of "collection.akka"


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f0c9fd4b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f0c9fd4b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f0c9fd4b

Branch: refs/heads/datastax-cass-driver
Commit: f0c9fd4bd91a271ee1e9a93a6fa70bf69159f7db
Parents: 2d5ad05
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jul 1 11:09:37 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jul 1 11:09:37 2016 -0400

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  56 ++---
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../persistence/actorsystem/ActorSystemFig.java |  60 ++---
 .../actorsystem/ActorSystemManagerImpl.java     |  53 ++---
 .../mvcc/stage/write/WriteCommit.java           |  10 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |  10 +-
 .../uniquevalues/UniqueValuesFig.java           |  28 +--
 .../mvcc/stage/delete/MarkCommitTest.java       |   2 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   8 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   2 +-
 .../org/apache/usergrid/rest/UniqueCatsIT.java  | 237 +++++++++++++++++++
 11 files changed, 344 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 29b8d36..fe70569 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -410,60 +410,44 @@ usergrid.queue.lock.timeout=5
 #usergrid.queue.publish.queuesize=850000
 
 
-#########################  Akka Actor System Configiuration ###################
+#########################  Usergrid Cluster Configuration ###################
 #
-# Usergrid includes Akka, an Actor-based system that allows for the
-# distribution of work across multiple Usergrid instances and multiple regions.
-#
-# All properties are required. If Akka is enabled then all properties in this
-# section MUST be specified.
-#
-# For more information: https://issues.apache.org/jira/browse/USERGRID-1268
+# Usergrid includes a multi-region clustering system.
+# To user it you must specify your region, the list of regions and seeds for each region.
 #
 
-# Currently, Akka is disable and not required for Usergrid
-collection.akka.enabled=false
+# This is an experimentation new feature, disabled by default
+usergrid.cluster.enabled=false
 
-# host name of this machine
-collection.akka.hostname=localhost
+# Comma-separated list of regions to be considered
+usergrid.cluster.region.list=default
 
-# The region of this Usergrid installation
-# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property
-collection.akka.region=
+# The regions of this local instance of Usergrid
+usergrid.cluster.region.local=default
 
-# Comma-separated lists of Akka seeds each with format {region}:{hostname}:{port}.
-# All regions MUST be listed in the 'usergrid.queue.regionList'
-collection.akka.region.seeds=
-
-# The default authoritative region for when is not specified elsewhere
-# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property
-collection.akka.authoritative.region=
+# Comma-separated lists of cluster seeds each with format {region}:{hostname}
+usergrid.cluster.seeds=default:localhost
 
-# Default number of Akka actors to start per instance / router producer
-collection.akka.instances-per-node=300
+# Port used for cluster communications.
+usergrid.cluster.port=2551
 
 
 #########################  Usergrid Unique Values Validation ##################
 #
-# Usergrid includes a distributed unique values validation that ensure that
-# unique values rename unique across a distributed and multi-region system.
-# This system is based on the Akka actor system and requires some additional
-# configuration.
-#
-# The system uses consistent hashing to ensure that one single-threaded actor
-# ever accesses a unique value record at one time.
-#
-# For more information: https://issues.apache.org/jira/browse/USERGRID-1268
+# These only apply if the above Usergrid cluster system is enabled.
 #
 
 # The number of unique value actors to start on each Usergrid instance.
-collection.akka.uniquevalue.actors=300
+collection.uniquevalues.actors=300
 
 # TTL of unique value reservation in in-memory cache
-collection.akka.uniquevalue.cache.ttl=10
+collection.uniquevalues.cache.ttl=10
 
 # TTL of a unique value reservation when written to Cassandra
-collection.akka.uniquevalue.reservation.ttl=10
+collection.uniquevalues.reservation.ttl=10
+
+# The default authoritative region for when is not specified elsewhere
+collection.uniquevalues.authoritative.region=default
 
 
 ##############################  Usergrid Scheduler  ###########################

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index eca5927..e70a6fd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -142,7 +142,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         logger.info("EntityManagerFactoring starting...");
 
-        if ( actorSystemFig.getAkkaEnabled() ) {
+        if ( actorSystemFig.getEnabled() ) {
             try {
                 logger.info("Akka cluster starting...");
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
index ec010d0..5d7b6aa 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
@@ -30,64 +30,54 @@ import java.io.Serializable;
 @FigSingleton
 public interface ActorSystemFig extends GuicyFig, Serializable {
 
-    String AKKA_ENABLED = "collection.akka.enabled";
+    String CLUSTER_ENABLED = "usergrid.cluster.enabled";
 
-    String AKKA_HOSTNAME = "collection.akka.hostname";
+    String CLUSTER_REGIONS_LIST = "usergrid.cluster.region.list";
 
-    String AKKA_REGION = "collection.akka.region";
+    String CLUSTER_REGIONS_LOCAL = "usergrid.cluster.region.local";
 
-    String AKKA_REGION_LIST = "usergrid.queue.regionList"; // same region list used by queues
+    String CLUSTER_SEEDS = "usergrid.cluster.seeds";
 
-    String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
-
-    String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region";
-
-    String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node";
+    String CLUSTER_PORT = "usergrid.cluster.port";
 
 
     /**
-     * Use Akka or nah
+     * Use Cluster or nah
      */
-    @Key(AKKA_ENABLED)
+    @Key(CLUSTER_ENABLED)
     @Default("true")
-    boolean getAkkaEnabled();
-
-    /**
-     * Hostname to be used in Akka configuration.
-     */
-    @Key(AKKA_HOSTNAME)
-    String getHostname();
+    boolean getEnabled();
 
     /**
      * Local region to be used in Akka configuration.
      */
-    @Key(AKKA_REGION)
-    String getRegion();
+    @Key(CLUSTER_REGIONS_LOCAL)
+    @Default("default")
+    String getRegionLocal();
 
     /**
      * Comma separated list of regions known to cluster.
      */
-    @Key(AKKA_REGION_LIST)
-    String getRegionList();
+    @Key(CLUSTER_REGIONS_LIST)
+    @Default("default")
+    String getRegionsList();
 
     /**
-     * Comma-separated lists of seeds each with format {region}:{hostname}:{port}.
-     * Regions MUST be listed in the 'usergrid.queue.regionList'
+     * Comma-separated lists of seeds each with format {region}:{hostname}
      */
-    @Key(AKKA_REGION_SEEDS)
-    String getRegionSeeds();
+    @Key(CLUSTER_SEEDS)
+    @Default("default:localhost")
+    String getSeeds();
 
     /**
-     * If no region specified for type, use the authoritative region
+     * Port for cluster comms.
      */
-    @Key(AKKA_AUTHORITATIVE_REGION)
-    String getAkkaAuthoritativeRegion();
+    @Key(CLUSTER_PORT)
+    @Default("2551")
+    String getPort();
 
 
-    /**
-     * Number of actor instances to create on each node for each router.
-     */
-    @Key(AKKA_INSTANCES_PER_NODE)
-    @Default("300")
-    int getInstancesPerNode();
+    @Key("usergrid.cluster.hostname")
+    @Default("")
+    String getHostname();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 05f837d..a79f447 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -77,8 +79,17 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
     @Override
     public void start() {
 
-        this.hostname = actorSystemFig.getHostname();
-        this.currentRegion = actorSystemFig.getRegion();
+        if ( !StringUtils.isEmpty( actorSystemFig.getHostname()) ) {
+            this.hostname = actorSystemFig.getHostname();
+        } else {
+            try {
+                this.hostname = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                logger.error("Cannot get hostname, defaulting to 'localhost': " + e.getMessage());
+            }
+        }
+
+        this.currentRegion = actorSystemFig.getRegionLocal();
         this.port = null;
 
         initAkka();
@@ -155,32 +166,22 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
         // Create one actor system with request actor for each region
 
-        if ( StringUtils.isEmpty( hostname )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_HOSTNAME );
-        }
-
         if ( StringUtils.isEmpty( currentRegion )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION );
+            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LOCAL );
         }
 
-        if ( StringUtils.isEmpty( actorSystemFig.getRegionList() )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_LIST );
+        if ( StringUtils.isEmpty( actorSystemFig.getRegionsList() )) {
+            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LIST );
         }
 
-        if ( StringUtils.isEmpty( actorSystemFig.getRegionSeeds() )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_SEEDS);
+        if ( StringUtils.isEmpty( actorSystemFig.getSeeds() )) {
+            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_SEEDS );
         }
 
-        if ( StringUtils.isEmpty( actorSystemFig.getAkkaAuthoritativeRegion() )) {
-            logger.warn("No value for {} specified, will use current region as authoriative region",
-                ActorSystemFig.AKKA_AUTHORITATIVE_REGION);
-            //throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_AUTHORITATIVE_REGION);
-        }
-
-        List regionList = Arrays.asList( actorSystemFig.getRegionList().toLowerCase().split(",") );
+        List regionList = Arrays.asList( actorSystemFig.getRegionsList().toLowerCase().split(",") );
 
         logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}",
-            hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() );
+            hostname, currentRegion, regionList, actorSystemFig.getSeeds() );
 
         Config config = readClusterSystemConfig();
 
@@ -205,7 +206,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
             seedsByRegion = ArrayListMultimap.create();
 
-            String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," );
+            String[] regionSeeds = actorSystemFig.getSeeds().split( "," );
 
             logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds );
 
@@ -226,7 +227,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                         String[] parts = regionSeed.split( ":" );
                         String region = parts[0];
                         String hostname = parts[1];
-                        String regionPortString = parts[2];
+
+                        String regionPortString = parts.length > 2 ? parts[2] : actorSystemFig.getPort();
 
                         // all seeds in same region must use same port
                         // we assume 0th seed has the right port
@@ -269,7 +271,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
         try {
 
-            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
+            int numInstancesPerNode = 300; // expect this to be overridden by RouterProducers
 
             String region = currentRegion;
 
@@ -277,11 +279,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
             int lastColon = seeds.get(0).lastIndexOf(":") + 1;
             final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon ));
 
-            logger.info( "Akka Config for region {} is:\n" +
-                    "   Hostname {}\n" +
-                    "   Seeds {}\n" +
-                    "   Authoritative Region {}\n",
-                region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
+            logger.info( "Akka Config for region {} is:\n" + "   Hostname {}\n" + "   Seeds {}\n",
+                region, hostname, seeds );
 
             Map<String, Object> configMap = new HashMap<String, Object>() {{
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 65d1734..5b98ca5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
     private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class );
 
     ActorSystemFig actorSystemFig;
+    UniqueValuesFig uniqueValuesFig;
     UniqueValuesService akkaUvService;
 
     @Inject
@@ -82,6 +84,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
                         final MvccEntitySerializationStrategy entryStrat,
                         final UniqueValueSerializationStrategy uniqueValueStrat,
                         final ActorSystemFig actorSystemFig,
+                        final UniqueValuesFig uniqueValuesFig,
                         final UniqueValuesService akkaUvService ) {
 
         Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
@@ -92,6 +95,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         this.entityStrat = entryStrat;
         this.uniqueValueStrat = uniqueValueStrat;
         this.actorSystemFig = actorSystemFig;
+        this.uniqueValuesFig = uniqueValuesFig;
         this.akkaUvService = akkaUvService;
     }
 
@@ -130,13 +134,13 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         logMutation.mergeShallow( entityMutation );
 
         // akkaFig may be null when this is called from JUnit tests
-        if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) {
+        if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
             String region = ioEvent.getRegion();
             if ( region == null ) {
-                region = actorSystemFig.getAkkaAuthoritativeRegion();
+                region = uniqueValuesFig.getAuthoritativeRegion();
             }
             if ( region == null ) {
-                region = actorSystemFig.getRegion();
+                region = actorSystemFig.getRegionLocal();
             }
             confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region );
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index f159096..985137b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -40,6 +40,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -63,6 +64,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class );
 
     ActorSystemFig actorSystemFig;
+    UniqueValuesFig uniqueValuesFig;
     UniqueValuesService akkaUvService;
 
     private final UniqueValueSerializationStrategy uniqueValueStrat;
@@ -83,11 +85,13 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
                              final Keyspace keyspace,
                              final CassandraConfig cassandraFig,
                              final ActorSystemFig actorSystemFig,
+                             final UniqueValuesFig uniqueValuesFig,
                              final UniqueValuesService akkaUvService ) {
 
         this.keyspace = keyspace;
         this.cassandraFig = cassandraFig;
         this.actorSystemFig = actorSystemFig;
+        this.uniqueValuesFig = uniqueValuesFig;
         this.akkaUvService = akkaUvService;
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
@@ -102,7 +106,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
     @Override
     public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
-        if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) {
+        if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
             verifyUniqueFieldsAkka( ioevent );
         } else {
             verifyUniqueFields( ioevent );
@@ -121,10 +125,10 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         String region = ioevent.getRegion();
         if ( region == null ) {
-            region = actorSystemFig.getAkkaAuthoritativeRegion();
+            region = uniqueValuesFig.getAuthoritativeRegion();
         }
         if ( region == null ) {
-            region = actorSystemFig.getRegion();
+            region = actorSystemFig.getRegionLocal();
         }
         try {
             akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
index c99824f..edd0cbe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
@@ -28,40 +28,40 @@ import java.io.Serializable;
 @FigSingleton
 public interface UniqueValuesFig extends GuicyFig, Serializable {
 
-    String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
+    String UNIQUEVALUE_ACTORS = "collection.uniquevalues.actors";
 
-    String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
+    String UNIQUEVALUE_CACHE_TTL = "collection.uniquevalues.cache.ttl";
 
-    String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+    String UNIQUEVALUE_RESERVATION_TTL= "collection.uniquevalues.reservation.ttl";
 
-    String AKKA_UNIQUEVALUE_INSTANCES_PER_NODE = "collection.akka.uniquevalue.instances-per-node";
+    String UNIQUEVALUE_AUTHORITATIVE_REGION = "collection.uniquevalues.authoritative.region";
 
 
     /**
-     * Number of UniqueValueActors to be started on each node
-     */
-    @Key(AKKA_UNIQUEVALUE_ACTORS)
-    @Default("300")
-    int getUniqueValueActors();
-
-    /**
      * Unique Value cache TTL in seconds.
      */
-    @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
+    @Key(UNIQUEVALUE_CACHE_TTL)
     @Default("10")
     int getUniqueValueCacheTtl();
 
     /**
      * Unique Value Reservation TTL in seconds.
      */
-    @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
+    @Key(UNIQUEVALUE_RESERVATION_TTL)
     @Default("10")
     int getUniqueValueReservationTtl();
 
     /**
      * Number of actor instances to create on each.
      */
-    @Key(AKKA_UNIQUEVALUE_INSTANCES_PER_NODE)
+    @Key(UNIQUEVALUE_ACTORS)
     @Default("300")
     int getUniqueValueInstancesPerNode();
+
+    /**
+     * Primary authoritative region (used if none other specified).
+     */
+    @Key(UNIQUEVALUE_AUTHORITATIVE_REGION)
+    @Default("default")
+    String getAuthoritativeRegion();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index e7cee21..a0ee6be 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -71,7 +71,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null );
+        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null);
 
 
         //verify the observable is correct

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 8665ee9..dcc473c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -84,10 +84,12 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null );
+        WriteCommit newStage =
+            new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null );
 
 
-        Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
+        Entity result = newStage.call(
+            new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
 
 
         //verify the log entry is correct
@@ -131,7 +133,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
         when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
                 .thenReturn( entityMutation );
 
-        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ).call( event );
+        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null, null ).call( event );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 635e262..46cfde1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -96,7 +96,7 @@ public class WriteUniqueVerifyTest extends AbstractUniqueValueTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig, null, null, null );
 
        newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java
new file mode 100644
index 0000000..0120660
--- /dev/null
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.rest;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.ConnectException;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+
+/**
+ * Tests that Catgrid will not allow creation of entities with duplicate names.
+ *
+ * Intended for use against a production-like cluster, not run during normal JUnit testing.
+ *
+ * Comment out the @Ignore annotation below and edit to add your target hosts.
+ */
+public class UniqueCatsIT {
+    private static final Logger logger = LoggerFactory.getLogger( UniqueCatsIT.class );
+
+    private static final AtomicInteger successCounter = new AtomicInteger( 0 );
+    private static final AtomicInteger errorCounter = new AtomicInteger( 0 );
+    private static final AtomicInteger dupCounter = new AtomicInteger( 0 );
+
+    @Test
+    //@Ignore("Intended for use against  prod-like cluster")
+    public void testDuplicatePrevention() throws Exception {
+
+        int numThreads = 20;
+        int poolSize = 20;
+        int numCats = 100;
+
+        Multimap<String, String> catsCreated = Multimaps.synchronizedMultimap( HashMultimap.create() );
+        Multimap<String, Map<String, Object>> dupsRejected = Multimaps.synchronizedMultimap( HashMultimap.create() );
+
+        ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+        Client client = ClientBuilder.newClient();
+
+        final MetricRegistry metrics = new MetricRegistry();
+        final Timer responses = metrics.timer(name(UniqueCatsIT.class, "responses"));
+        long startTime = System.currentTimeMillis();
+
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        //String[] targetHosts = {"http://localhost:8080"};
+
+        String[] targetHosts = {
+            "https://ug21-west.e2e.apigee.net",
+            "https://ug21-east.e2e.apigee.net"
+        };
+
+        for (int i = 0; i < numCats; i++) {
+
+            if ( failed.get() ) { break; }
+
+            String randomizer = RandomStringUtils.randomAlphanumeric( 8 );
+
+            // multiple threads simultaneously trying to create a cat with the same propertyName
+            for (int j = 0; j < numThreads; j++) {
+
+                if ( failed.get() ) { break; }
+
+                final String name = "uv_test_cat_" + randomizer;
+                final String host = targetHosts[ j % targetHosts.length ];
+
+                execService.submit( () -> {
+
+                    Map<String, Object> form = new HashMap<String, Object>() {{
+                        put("name", name);
+                    }};
+
+                    Timer.Context time = responses.time();
+                    try {
+                        WebTarget target = client.target( host ).path(
+                            //"/test-organization/test-app/cats" );
+                            "/dmjohnson/sandbox/cats" );
+
+                        //logger.info("Posting cat {} to host {}", catname, host);
+
+                        Response response = target.request()
+                            //.post( Entity.entity( form, MediaType.APPLICATION_FORM_URLENCODED ));
+                            .post( Entity.entity( form, MediaType.APPLICATION_JSON));
+
+                        org.apache.usergrid.rest.test.resource.model.ApiResponse apiResponse = null;
+                        String responseAsString = "";
+                        if ( response.getStatus() >= 400 ) {
+                            responseAsString = response.readEntity( String.class );
+                        } else {
+                            apiResponse = response.readEntity(
+                                org.apache.usergrid.rest.test.resource.model.ApiResponse.class );
+                        }
+
+                        if ( response.getStatus() == 200 || response.getStatus() == 201 ) {
+                            catsCreated.put( name, apiResponse.getEntity().getUuid().toString() );
+                            successCounter.incrementAndGet();
+
+                        } else if ( response.getStatus() == 400
+                                && responseAsString.contains("DuplicateUniquePropertyExistsException")) {
+                            dupsRejected.put( name, form );
+                            dupCounter.incrementAndGet();
+
+                        } else {
+                            logger.error("Cat creation failed status {} message {}",
+                                response.getStatus(), responseAsString );
+                            errorCounter.incrementAndGet();
+                        }
+
+                    } catch ( ProcessingException e ) {
+                        errorCounter.incrementAndGet();
+                        if ( e.getCause() instanceof ConnectException ) {
+                            logger.error("Error connecting to " + host);
+                        } else {
+                            logger.error( "Error", e );
+                        }
+
+                    } catch ( Exception e ) {
+                        errorCounter.incrementAndGet();
+                        logger.error("Error", e);
+                    }
+                    time.stop();
+
+                } );
+            }
+        }
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 60, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting..." );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        logger.info( "Total time {}s", (endTime - startTime) / 1000 );
+
+        DecimalFormat format = new DecimalFormat("##.###");
+
+        logger.info( "Timed {} requests:\n" +
+                        "mean rate {}/s\n" +
+                        "min       {}s\n" +
+                        "max       {}s\n" +
+                        "mean      {}s",
+                responses.getCount(),
+                format.format( responses.getMeanRate() ),
+                format.format( (double)responses.getSnapshot().getMin()  / 1000000000 ),
+                format.format( (double)responses.getSnapshot().getMax()  / 1000000000 ),
+                format.format( responses.getSnapshot().getMean() / 1000000000 )
+        );
+
+        logger.info( "Error count {} ratio = {}",
+                errorCounter.get(), (float) errorCounter.get() / (float) responses.getCount() );
+
+        logger.info( "Success count = {}", successCounter.get() );
+
+        logger.info( "Rejected dup count = {}", dupCounter.get() );
+
+//        for ( String catname : catsCreated.keys() ) {
+//            System.out.println( catname );
+//            Collection<Cat> cats = catsCreated.get( catname );
+//            for ( Cat cat : cats ) {
+//                System.out.println("   " + cat.getUuid() );
+//            }
+//        }
+
+//        int count = 0;
+//        for ( String catname : dupsRejected.keySet() ) {
+//            System.out.println( catname );
+//            Collection<Cat> cats = dupsRejected.get( catname );
+//            for ( Cat cat : cats ) {
+//                System.out.println("   " + (count++) + " rejected " + cat.getCatname() + ":" + cat.getUuid() );
+//            }
+//        }
+
+        int catCount = 0;
+        int catnamesWithDuplicates = 0;
+        for ( String name : catsCreated.keySet() ) {
+            //Collection<Map<String, String>> forms =
+            Collection<String> forms = catsCreated.get( name );
+            if ( forms.size() > 1 ) {
+                catnamesWithDuplicates++;
+                logger.info("Duplicate " + name);
+            }
+            catCount++;
+        }
+        Assert.assertEquals( 0, catnamesWithDuplicates );
+        Assert.assertEquals( 0, errorCounter.get() );
+        Assert.assertEquals( numCats, successCounter.get() );
+        Assert.assertEquals( numCats, catCount );
+
+
+    }
+
+}