You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/07/11 15:35:25 UTC
[15/50] [abbrv] usergrid git commit: Fix authoritative region logic
and add property for specifying the system-wide authoritative region.
Fix authoritative region logic and add property for specifying the system-wide authoritative region.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/58fc657c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/58fc657c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/58fc657c
Branch: refs/heads/release-2.1.1
Commit: 58fc657cb501312934ba7d9888ed31affbe3fcb7
Parents: 0c5deac
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Jun 17 12:43:18 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Jun 17 12:43:18 2016 -0400
----------------------------------------------------------------------
.../main/resources/usergrid-default.properties | 7 +++++--
.../corepersistence/CpEntityManager.java | 14 ++++++++++---
.../corepersistence/CpEntityManagerFactory.java | 22 ++++++++++++++------
.../service/ApplicationServiceImpl.java | 9 +++++---
.../collection/EntityCollectionManager.java | 3 ++-
.../impl/EntityCollectionManagerImpl.java | 6 +++---
.../mvcc/stage/write/WriteCommit.java | 6 +++++-
.../mvcc/stage/write/WriteUniqueVerify.java | 6 +++++-
.../collection/uniquevalues/AkkaFig.java | 10 ++++-----
.../uniquevalues/ReservationCache.java | 5 +++++
.../collection/EntityCollectionManagerIT.java | 2 +-
11 files changed, 63 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index d7a3311..ffe5fc1 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -443,16 +443,19 @@ collection.akka.region.seeds=us-east-1:localhost:2551
# The number of unique value actors to start on each Usergrid instance.
collection.akka.uniquevalue.actors=300
-# TTL of unique value reservastion in in-memory cache
+# TTL of unique value reservation in in-memory cache
collection.akka.uniquevalue.cache.ttl=10
# TTL of a unique value reservation when written to Cassandra
collection.akka.uniquevalue.reservation.ttl=10
+# If no region specified for type, use the authoritative region
+#collection.akka.uniquevalue.authoritative.region=
+
############################## Usergrid Scheduler ###########################
#
-# Usergrid uses a scheduler for some functions such as scheduled push notificatins.
+# Usergrid uses a scheduler for some functions such as scheduled push notifications.
# Use the below settings to configure the scheduler.
#
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index b33321b..b0d3f59 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -43,6 +43,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.FieldSet;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.*;
@@ -104,6 +105,8 @@ public class CpEntityManager implements EntityManager {
private final UUID applicationId;
private final EntityManagerFig entityManagerFig;
+ private final AkkaFig akkaFig;
+
private Application application;
@@ -168,6 +171,7 @@ public class CpEntityManager implements EntityManager {
final AsyncEventService indexService,
final ManagerCache managerCache,
final MetricsFactory metricsFactory,
+ final AkkaFig akkaFig,
final EntityManagerFig entityManagerFig,
final GraphManagerFactory graphManagerFactory,
final CollectionService collectionService,
@@ -176,6 +180,7 @@ public class CpEntityManager implements EntityManager {
final UUID applicationId ) {
this.entityManagerFig = entityManagerFig;
+ this.akkaFig = akkaFig;
Preconditions.checkNotNull( cass, "cass must not be null" );
Preconditions.checkNotNull( counterUtils, "counterUtils must not be null" );
@@ -249,8 +254,7 @@ public class CpEntityManager implements EntityManager {
*/
org.apache.usergrid.persistence.model.entity.Entity load( Id entityId ) {
- return ecm .load( entityId ).toBlocking()
- .lastOrDefault(null);
+ return ecm .load( entityId ).toBlocking().lastOrDefault(null);
}
@@ -605,7 +609,8 @@ public class CpEntityManager implements EntityManager {
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
//Step 1 & 2 of delete
- return ecm.mark( entityId ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
+ String region = this.lookupRegionForType( entityRef.getType() );
+ return ecm.mark( entityId, region ).mergeWith( gm.markNode( entityId, createGraphOperationTimestamp() ) );
}
@@ -3058,6 +3063,8 @@ public class CpEntityManager implements EntityManager {
private String lookupRegionForType( String type ) {
String region = null;
+
+ // get collection settings for type
MapManager mm = getMapManagerForTypes();
CollectionSettingsCache collectionSettingsCache = collectionSettingsCacheFactory.getInstance( mm );
String collectionName = Schema.defaultCollectionName( type );
@@ -3068,6 +3075,7 @@ public class CpEntityManager implements EntityManager {
if ( collectionSettings.isPresent() && collectionSettings.get().get("region") != null ) {
region = collectionSettings.get().get("region").toString();
}
+
return region;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 8e8c5e8..bc1b335 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -80,7 +80,9 @@ import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
public class CpEntityManagerFactory implements EntityManagerFactory, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger( CpEntityManagerFactory.class );
+
private final EntityManagerFig entityManagerFig;
+ private final AkkaFig akkaFig;
private ApplicationContext applicationContext;
@@ -123,6 +125,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
this.injector = injector;
this.reIndexService = injector.getInstance(ReIndexService.class);
this.entityManagerFig = injector.getInstance(EntityManagerFig.class);
+ this.akkaFig = injector.getInstance( AkkaFig.class );
this.managerCache = injector.getInstance( ManagerCache.class );
this.metricsFactory = injector.getInstance( MetricsFactory.class );
this.indexService = injector.getInstance( AsyncEventService.class );
@@ -134,9 +137,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
Properties properties = cassandraService.getProperties();
this.entityManagers = createEntityManagerCache( properties );
- AkkaFig akkaFig = injector.getInstance( AkkaFig.class );
-
-
logger.info("EntityManagerFactoring starting...");
if ( akkaFig.getAkkaEnabled() ) {
@@ -354,9 +354,19 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private EntityManager _getEntityManager( UUID applicationId ) {
- EntityManager em = new CpEntityManager( cassandraService, counterUtils, indexService, managerCache,
- metricsFactory, entityManagerFig, graphManagerFactory, collectionService, connectionService,
- collectionSettingsCacheFactory, applicationId );
+ EntityManager em = new CpEntityManager(
+ cassandraService,
+ counterUtils,
+ indexService,
+ managerCache,
+ metricsFactory,
+ akkaFig,
+ entityManagerFig,
+ graphManagerFactory,
+ collectionService,
+ connectionService,
+ collectionSettingsCacheFactory,
+ applicationId );
return em;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
index 91f11f5..c6b3b15 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ApplicationServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -60,7 +61,7 @@ public class ApplicationServiceImpl implements ApplicationService{
private final MapManagerFactory mapManagerFactory;
private final GraphManagerFactory graphManagerFactory;
private final CollectionSettingsCacheFactory collectionSettingsCacheFactory;
-
+ private final AkkaFig akkaFig;
@Inject
@@ -70,7 +71,8 @@ public class ApplicationServiceImpl implements ApplicationService{
EventBuilder eventBuilder,
MapManagerFactory mapManagerFactory,
GraphManagerFactory graphManagerFactory,
- CollectionSettingsCacheFactory collectionSettingsCacheFactory
+ CollectionSettingsCacheFactory collectionSettingsCacheFactory,
+ AkkaFig akkaFig
){
this.allEntityIdsObservable = allEntityIdsObservable;
@@ -80,6 +82,7 @@ public class ApplicationServiceImpl implements ApplicationService{
this.mapManagerFactory = mapManagerFactory;
this.graphManagerFactory = graphManagerFactory;
this.collectionSettingsCacheFactory = collectionSettingsCacheFactory;
+ this.akkaFig = akkaFig;
}
@@ -112,7 +115,7 @@ public class ApplicationServiceImpl implements ApplicationService{
}
countObservable = countObservable.map(id -> {
- entityCollectionManager.mark((Id) id)
+ entityCollectionManager.mark((Id) id, null )
.mergeWith(graphManager.markNode((Id) id, createGraphOperationTimestamp())).toBlocking().last();
return id;
})
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 0dad0d7..b6056b5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -51,9 +51,10 @@ public interface EntityCollectionManager {
* @param entityId MarkCommit the entity as deleted. Will not actually remove it from cassandra. This operation will
* also remove all unique properties for this entity
*
+ * @param region
* @return The observable of the id after the operation has completed
*/
- Observable<Id> mark( Id entityId );
+ Observable<Id> mark(Id entityId, String region);
/**
* @param entityId The entity id to load.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 09bd01b..3cc4a07 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -214,14 +214,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Override
- public Observable<Id> mark( final Id entityId ) {
+ public Observable<Id> mark(final Id entityId, String region) {
Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
- Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart )
- .doOnNext( markCommit ).compose( uniqueCleanup ).map(
+ Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId, region ) )
+ .map( markStart ).doOnNext( markCommit ).compose( uniqueCleanup ).map(
entityEvent -> entityEvent.getEvent().getId() );
return ObservableTimer.time( o, deleteTimer );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index abb54c9..44028ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -131,7 +131,11 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
// akkaFig may be null when this is called from JUnit tests
if ( akkaFig != null && akkaFig.getAkkaEnabled() ) {
- confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, ioEvent.getRegion() );
+ String region = ioEvent.getRegion();
+ if ( region == null ) {
+ region = akkaFig.getAkkaAuthoritativeRegion();
+ }
+ confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region );
} else {
confirmUniqueFields( mvccEntity, version, applicationScope, logMutation );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index acfc5f6..da394f7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -122,8 +122,12 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
final ApplicationScope applicationScope = ioevent.getEntityCollection();
+ String region = ioevent.getRegion();
+ if ( region == null ) {
+ region = akkaFig.getAkkaAuthoritativeRegion();
+ }
try {
- akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), ioevent.getRegion() );
+ akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(), region );
} catch (UniqueValueException e) {
Map<String, Field> violations = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
index e709920..0f97403 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
@@ -47,8 +47,7 @@ public interface AkkaFig extends GuicyFig, Serializable {
String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
- String AKKA_UNIQUEVALUE_REGION_TYPES = "collection.akka.uniquevalue.region.types";
-
+ String AKKA_AUTHORITATIVE_REGION = "collection.akka.uniquevalue.authoritative.region";
/**
* Use Akka or nah
@@ -96,11 +95,10 @@ public interface AkkaFig extends GuicyFig, Serializable {
String getRegionSeeds();
/**
- * Authoritative regions may be specified for types
- * Comma-separated lists of region types each with format {region}:{type}
+ * If no region specified for type, use the authoritative region
*/
- @Key(AKKA_UNIQUEVALUE_REGION_TYPES)
- String getRegionTypes();
+ @Key(AKKA_AUTHORITATIVE_REGION)
+ String getAkkaAuthoritativeRegion();
/**
* Unique Value cache TTL in seconds.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
index 8dba606..24b7f6e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
@@ -29,11 +29,13 @@ public class ReservationCache {
private static final Logger logger = LoggerFactory.getLogger( RequestActor.class );
Cache<String, UniqueValueActor.Reservation> cache;
+ long ttl;
// use hokey old-style singleton because its not that easy to get Guice into an actor
private static ReservationCache instance = null;
ReservationCache( long ttl ) {
+ this.ttl = ttl;
cache = CacheBuilder.newBuilder()
.maximumSize(1000)
.concurrencyLevel( 300 )
@@ -54,15 +56,18 @@ public class ReservationCache {
}
public UniqueValueActor.Reservation get( String rowKey ) {
+ if ( ttl == 0 ) { return null; }
UniqueValueActor.Reservation res = cache.getIfPresent( rowKey );
return res;
}
public void cacheReservation( UniqueValueActor.Reservation reservation ) {
+ if ( ttl == 0 ) { return; }
cache.put( reservation.getConsistentHashKey(), reservation );
}
public void cancelReservation( UniqueValueActor.Cancellation cancellation ) {
+ if ( ttl == 0 ) { return; }
cache.invalidate( cancellation.getConsistentHashKey() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58fc657c/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 14565c2..d94b7b5 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -190,7 +190,7 @@ public class EntityCollectionManagerIT {
assertEquals( "Same value", createReturned, loadReturned );
- manager.mark( createReturned.getId() ).toBlocking().last();
+ manager.mark( createReturned.getId(), null ).toBlocking().last();
loadObservable = manager.load( createReturned.getId() );