You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/11 15:35:25 UTC

[15/50] [abbrv] usergrid git commit: Fix authoritative region logic and add property for specifying the system-wide authoritative region.

Fix authoritative region logic and add property for specifying the system-wide authoritative region.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/58fc657c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/58fc657c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/58fc657c

Branch: refs/heads/release-2.1.1
Commit: 58fc657cb501312934ba7d9888ed31affbe3fcb7
Parents: 0c5deac
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jun 17 12:43:18 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jun 17 12:43:18 2016 -0400

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  7 +++++--
 .../corepersistence/CpEntityManager.java        | 14 ++++++++++---
 .../corepersistence/CpEntityManagerFactory.java | 22 ++++++++++++++------
 .../service/ApplicationServiceImpl.java         |  9 +++++---
 .../collection/EntityCollectionManager.java     |  3 ++-
 .../impl/EntityCollectionManagerImpl.java       |  6 +++---
 .../mvcc/stage/write/WriteCommit.java           |  6 +++++-
 .../mvcc/stage/write/WriteUniqueVerify.java     |  6 +++++-
 .../collection/uniquevalues/AkkaFig.java        | 10 ++++-----
 .../uniquevalues/ReservationCache.java          |  5 +++++
 .../collection/EntityCollectionManagerIT.java   |  2 +-
 11 files changed, 63 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d7a3311..ffe5fc1 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -443,16 +443,19 @@ collection.akka.region.seeds=us-east-1:localhost:2551
 # The number of unique value actors to start on each Usergrid instance.
 collection.akka.uniquevalue.actors=300
 
-# TTL of unique value reservastion in in-memory cache
+# TTL of unique value reservation in in-memory cache
 collection.akka.uniquevalue.cache.ttl=10
 
 # TTL of a unique value reservation when written to Cassandra
 collection.akka.uniquevalue.reservation.ttl=10
 
+# If no region specified for type, use the authoritative region
+#collection.akka.uniquevalue.authoritative.region=
+
 
 ##############################  Usergrid Scheduler  ###########################
 #
-# Usergrid uses a scheduler for some functions such as scheduled push notificatins.
+# Usergrid uses a scheduler for some functions such as scheduled push notifications.
 # Use the below settings to configure the scheduler.
 #
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index b33321b..b0d3f59 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -43,6 +43,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntitySet;
 import org.apache.usergrid.persistence.collection.FieldSet;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.*;
@@ -104,6 +105,8 @@ public class CpEntityManager implements EntityManager {
 
     private final UUID applicationId;
     private final EntityManagerFig entityManagerFig;
+    private final AkkaFig akkaFig;
+
     private Application application;
 
 
@@ -168,6 +171,7 @@ public class CpEntityManager implements EntityManager {
                             final AsyncEventService indexService,
                             final ManagerCache managerCache,
                             final MetricsFactory metricsFactory,
+                            final AkkaFig akkaFig,
                             final EntityManagerFig entityManagerFig,
                             final GraphManagerFactory graphManagerFactory,
                             final CollectionService collectionService,
@@ -176,6 +180,7 @@ public class CpEntityManager implements EntityManager {
                             final UUID applicationId ) {
 
         this.entityManagerFig = entityManagerFig;
+        this.akkaFig = akkaFig;
 
         Preconditions.checkNotNull( cass, "cass must not be null" );
         Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" );
@@ -249,8 +254,7 @@ public class CpEntityManager implements EntityManager {
      */
     org.apache.usergrid.persistence.model.entity.Entity load( Id entityId ) {
 
-            return ecm .load( entityId ).toBlocking()
-                                       .lastOrDefault(null);
+            return ecm .load( entityId ).toBlocking().lastOrDefault(null);
 
     }
 
@@ -605,7 +609,8 @@ public class CpEntityManager implements EntityManager {
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
         //Step 1 & 2 of delete
-        return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
+        String region = this.lookupRegionForType( entityRef.getType() );
+        return ecm.mark( entityId, region ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
 
     }
 
@@ -3058,6 +3063,8 @@ public class CpEntityManager implements EntityManager {
     private  String lookupRegionForType( String type ) {
 
         String region = null;
+
+        // get collection settings for type
         MapManager mm = getMapManagerForTypes();
         CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm );
         String collectionName = Schema.defaultCollectionName( type );
@@ -3068,6 +3075,7 @@ public class CpEntityManager implements EntityManager {
         if ( collectionSettings.isPresent() && collectionSettings.get().get("region") != null ) {
             region = collectionSettings.get().get("region").toString();
         }
+
         return region;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 8e8c5e8..bc1b335 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -80,7 +80,9 @@ import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
 public class CpEntityManagerFactory implements EntityManagerFactory, ApplicationContextAware {
 
     private static final Logger logger = LoggerFactory.getLogger( CpEntityManagerFactory.class );
+
     private final EntityManagerFig entityManagerFig;
+    private final AkkaFig akkaFig;
 
     private ApplicationContext applicationContext;
 
@@ -123,6 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         this.injector = injector;
         this.reIndexService = injector.getInstance(ReIndexService.class);
         this.entityManagerFig = injector.getInstance(EntityManagerFig.class);
+        this.akkaFig = injector.getInstance( AkkaFig.class );
         this.managerCache = injector.getInstance( ManagerCache.class );
         this.metricsFactory = injector.getInstance( MetricsFactory.class );
         this.indexService = injector.getInstance( AsyncEventService.class );
@@ -134,9 +137,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         Properties properties = cassandraService.getProperties();
         this.entityManagers = createEntityManagerCache( properties );
 
-        AkkaFig akkaFig = injector.getInstance( AkkaFig.class );
-
-
         logger.info("EntityManagerFactoring starting...");
 
         if ( akkaFig.getAkkaEnabled() ) {
@@ -354,9 +354,19 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
     private EntityManager _getEntityManager( UUID applicationId ) {
 
-        EntityManager em = new CpEntityManager( cassandraService, counterUtils, indexService, managerCache,
-            metricsFactory, entityManagerFig, graphManagerFactory,  collectionService, connectionService,
-            collectionSettingsCacheFactory, applicationId );
+        EntityManager em = new CpEntityManager(
+            cassandraService,
+            counterUtils,
+            indexService,
+            managerCache,
+            metricsFactory,
+            akkaFig,
+            entityManagerFig,
+            graphManagerFactory,
+            collectionService,
+            connectionService,
+            collectionSettingsCacheFactory,
+            applicationId );
 
         return em;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
index 91f11f5..c6b3b15 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -60,7 +61,7 @@ public class ApplicationServiceImpl  implements ApplicationService{
     private final MapManagerFactory mapManagerFactory;
     private final GraphManagerFactory graphManagerFactory;
     private final CollectionSettingsCacheFactory collectionSettingsCacheFactory;
-
+    private final AkkaFig akkaFig;
 
 
     @Inject
@@ -70,7 +71,8 @@ public class ApplicationServiceImpl  implements ApplicationService{
                                   EventBuilder eventBuilder,
                                   MapManagerFactory mapManagerFactory,
                                   GraphManagerFactory graphManagerFactory,
-                                  CollectionSettingsCacheFactory collectionSettingsCacheFactory
+                                  CollectionSettingsCacheFactory collectionSettingsCacheFactory,
+                                  AkkaFig akkaFig
     ){
 
         this.allEntityIdsObservable = allEntityIdsObservable;
@@ -80,6 +82,7 @@ public class ApplicationServiceImpl  implements ApplicationService{
         this.mapManagerFactory = mapManagerFactory;
         this.graphManagerFactory = graphManagerFactory;
         this.collectionSettingsCacheFactory = collectionSettingsCacheFactory;
+        this.akkaFig = akkaFig;
     }
 
 
@@ -112,7 +115,7 @@ public class ApplicationServiceImpl  implements ApplicationService{
         }
 
         countObservable = countObservable.map(id -> {
-            entityCollectionManager.mark((Id) id)
+            entityCollectionManager.mark((Id) id, null )
                 .mergeWith(graphManager.markNode((Id) id, createGraphOperationTimestamp())).toBlocking().last();
             return id;
         })

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 0dad0d7..b6056b5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -51,9 +51,10 @@ public interface EntityCollectionManager {
      * @param entityId MarkCommit the entity as deleted.  Will not actually remove it from cassandra.  This operation will
      * also remove all unique properties for this entity
      *
+     * @param region
      * @return The observable of the id after the operation has completed
      */
-    Observable<Id> mark( Id entityId );
+    Observable<Id> mark(Id entityId, String region);
 
     /**
      * @param entityId The entity id to load.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 09bd01b..3cc4a07 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -214,14 +214,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
 
 
     @Override
-    public Observable<Id> mark( final Id entityId ) {
+    public Observable<Id> mark(final Id entityId, String region) {
 
         Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
 
-        Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart )
-            .doOnNext( markCommit ).compose( uniqueCleanup ).map(
+        Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId, region ) )
+            .map( markStart ).doOnNext( markCommit ).compose( uniqueCleanup ).map(
                 entityEvent -> entityEvent.getEvent().getId() );
 
         return ObservableTimer.time( o, deleteTimer );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index abb54c9..44028ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -131,7 +131,11 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
 
         // akkaFig may be null when this is called from JUnit tests
         if ( akkaFig != null && akkaFig.getAkkaEnabled() ) {
-            confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, ioEvent.getRegion() );
+            String region = ioEvent.getRegion();
+            if ( region == null ) {
+                region = akkaFig.getAkkaAuthoritativeRegion();
+            }
+            confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region );
         } else {
             confirmUniqueFields( mvccEntity, version, applicationScope, logMutation );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index acfc5f6..da394f7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -122,8 +122,12 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         final ApplicationScope applicationScope = ioevent.getEntityCollection();
 
+        String region = ioevent.getRegion();
+        if ( region == null ) {
+            region = akkaFig.getAkkaAuthoritativeRegion();
+        }
         try {
-            akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), ioevent.getRegion() );
+            akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region );
 
         } catch (UniqueValueException e) {
             Map<String, Field> violations = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
index e709920..0f97403 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
@@ -47,8 +47,7 @@ public interface AkkaFig extends GuicyFig, Serializable {
 
     String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
 
-    String AKKA_UNIQUEVALUE_REGION_TYPES = "collection.akka.uniquevalue.region.types";
-
+    String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region";
 
     /**
      * Use Akka or nah
@@ -96,11 +95,10 @@ public interface AkkaFig extends GuicyFig, Serializable {
     String getRegionSeeds();
 
     /**
-     * Authoritative regions may be specified for types
-     * Comma-separated lists of region types each with format {region}:{type}
+     * If no region specified for type, use the authoritative region
      */
-    @Key(AKKA_UNIQUEVALUE_REGION_TYPES)
-    String getRegionTypes();
+    @Key(AKKA_AUTHORITATIVE_REGION)
+    String getAkkaAuthoritativeRegion();
 
     /**
      * Unique Value cache TTL in seconds.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
index 8dba606..24b7f6e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
@@ -29,11 +29,13 @@ public class ReservationCache {
     private static final Logger logger = LoggerFactory.getLogger( RequestActor.class );
 
     Cache<String, UniqueValueActor.Reservation> cache;
+    long ttl;
 
     // use hokey old-style singleton because its not that easy to get Guice into an actor
     private static ReservationCache instance = null;
 
     ReservationCache( long ttl ) {
+        this.ttl = ttl;
         cache = CacheBuilder.newBuilder()
             .maximumSize(1000)
             .concurrencyLevel( 300 )
@@ -54,15 +56,18 @@ public class ReservationCache {
     }
 
     public UniqueValueActor.Reservation get( String rowKey ) {
+        if ( ttl == 0 ) { return null; }
         UniqueValueActor.Reservation res = cache.getIfPresent( rowKey );
         return res;
     }
 
     public void cacheReservation( UniqueValueActor.Reservation reservation ) {
+        if ( ttl == 0 ) { return; }
         cache.put( reservation.getConsistentHashKey(), reservation );
     }
 
     public void cancelReservation( UniqueValueActor.Cancellation cancellation ) {
+        if ( ttl == 0 ) { return; }
         cache.invalidate( cancellation.getConsistentHashKey() );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 14565c2..d94b7b5 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -190,7 +190,7 @@ public class EntityCollectionManagerIT {
 
         assertEquals( "Same value", createReturned, loadReturned );
 
-        manager.mark( createReturned.getId() ).toBlocking().last();
+        manager.mark( createReturned.getId(), null ).toBlocking().last();
 
         loadObservable = manager.load( createReturned.getId() );