You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/12 23:14:51 UTC

[01/15] usergrid git commit: Address cleanup not running on write for unique values. Cleanup now happens on write.

Repository: usergrid
Updated Branches:
  refs/heads/master 67ab24430 -> 14dd48d39


Address cleanup not running on write for unique values.  Cleanup now happens on write.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/419c0131
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/419c0131
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/419c0131

Branch: refs/heads/master
Commit: 419c0131ff299a25c2d5c2af1186f1f11c998925
Parents: 39ccdb1
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Nov 6 20:30:19 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Nov 6 20:30:19 2015 -0700

----------------------------------------------------------------------
 .../impl/EntityCollectionManagerImpl.java       |  4 +-
 .../mvcc/stage/delete/UniqueCleanup.java        | 12 ++++-
 .../mvcc/stage/write/WriteCommit.java           |  6 +--
 .../collection/EntityCollectionManagerIT.java   | 55 ++++++++++++++++++++
 .../mvcc/stage/delete/MarkCommitTest.java       |  2 +-
 .../mvcc/stage/write/WriteCommitTest.java       |  2 +-
 6 files changed, 74 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index ff3bd7b..cb1515c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -188,7 +188,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
 
-        final Observable<Entity> write = observable.map( writeCommit );
+        final Observable<Entity> write = observable.map( writeCommit ).compose( uniqueCleanup )
+                                                                              //now extract the ioEvent we need to return
+                                                                              .map( ioEvent -> ioEvent.getEvent().getEntity().get() );
 
         return ObservableTimer.time( write, writeTimer );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 3f66536..8aa5cfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -89,6 +89,9 @@ public class UniqueCleanup
                 final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
                 final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
                 final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+                //if it's been deleted, we need to remove everything up to an inclusive of this version.
+                //if it has not, we want to delete everything < this version
+                final boolean isDeleted = !mvccEntityCollectionIoEvent.getEvent().getEntity().isPresent();
 
 
                 //TODO Refactor this logic into a a class that can be invoked from anywhere
@@ -108,7 +111,14 @@ public class UniqueCleanup
                             logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion );
                             final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
                             //TODO: should this be equals? That way we clean up the one marked as well
-                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+
+
+                            if(isDeleted){
+                                return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+                            }
+
+                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) >= 0;
+
                         } )
 
                             //buffer our buffer size, then roll them all up in a single batch mutation

http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 2afb144..9b1a393 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -56,7 +56,7 @@ import rx.functions.Func1;
  * data store before returning
  */
 @Singleton
-public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> {
+public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( WriteCommit.class );
 
@@ -84,7 +84,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
 
 
     @Override
-    public Entity call( final CollectionIoEvent<MvccEntity> ioEvent ) {
+    public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioEvent ) {
 
         final MvccEntity mvccEntity = ioEvent.getEvent();
         MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
@@ -134,6 +134,6 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
         }
 
 
-        return mvccEntity.getEntity().get();
+        return ioEvent;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 68a04d0..e6c6909 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -315,6 +315,61 @@ public class EntityCollectionManagerIT {
 
 
     @Test
+    public void writeAndGetField2X() {
+
+
+        ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+        Entity newEntity = new Entity( new SimpleId( "test" ) );
+        Field field = new StringField( "testField", "unique", true );
+        newEntity.setField( field );
+
+        EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
+
+        Observable<Entity> observable = manager.write( newEntity );
+
+        Entity createReturned = observable.toBlocking().lastOrDefault( null );
+
+
+        assertNotNull( "Id was assigned", createReturned.getId() );
+        assertNotNull( "Version was assigned", createReturned.getVersion() );
+
+        Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+        assertNotNull( id );
+        assertEquals( newEntity.getId(), id );
+
+        Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
+        id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+        assertNull( id );
+
+
+        //ensure we clean up
+
+        Field fieldSecond = new StringField( "testField", "unique2", true );
+        newEntity.setField( fieldSecond );
+
+        Observable<Entity> observableSecond = manager.write( newEntity );
+
+        Entity createReturnedSecond = observableSecond.toBlocking().lastOrDefault( null );
+
+
+        assertNotNull( "Id was assigned", createReturnedSecond.getId() );
+        assertNotNull( "Version was assigned", createReturnedSecond.getVersion() );
+
+        Id idFirst = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+
+        assertNull(idFirst);
+
+        Id idSecond = manager.getIdField( newEntity.getId().getType(), fieldSecond ).toBlocking().lastOrDefault( null );
+
+        assertNotNull( idSecond );
+        assertEquals( newEntity.getId(), idSecond );
+
+
+    }
+
+
+    @Test
     public void updateVersioning() {
 
         // create entity

http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index 1e8fcd1..ad6eac6 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -75,7 +75,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
 
         //verify the observable is correct
-        Entity result  = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) );
+        Entity result  = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index fa6077f..58642d3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -87,7 +87,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
         WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
 
 
-        Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) );
+        Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
 
 
         //verify the log entry is correct


[15/15] usergrid git commit: Merge branch 'refs/heads/2.1-release'

Posted by to...@apache.org.
Merge branch 'refs/heads/2.1-release'


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/14dd48d3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/14dd48d3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/14dd48d3

Branch: refs/heads/master
Commit: 14dd48d39d37abf97092264003942dcb73b3ccdf
Parents: 67ab244 6f61b05
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 15:14:44 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 15:14:44 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    | 132 ++++++++++++++-----
 .../asyncevents/AmazonAsyncEventService.java    |  39 +++++-
 .../asyncevents/AsyncEventsSchedulerFig.java    |  94 +++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   2 +-
 .../asyncevents/EventExecutionScheduler.java    |  37 ++++++
 .../traverse/ReadGraphCollectionFilter.java     |   3 +-
 .../traverse/ReadGraphConnectionFilter.java     |   3 +-
 .../corepersistence/rx/impl/AsyncRepair.java    |  38 ++++++
 .../rx/impl/ResponseImportTasks.java            |  38 ++++++
 .../service/ServiceSchedulerFig.java            |  66 ++++++++++
 .../collection/guice/CollectionModule.java      |  32 +++++
 .../guice/CollectionTaskExecutor.java           |  35 -----
 .../EntityCollectionManagerFactoryImpl.java     |   5 +-
 .../impl/EntityCollectionManagerImpl.java       |  17 ++-
 .../mvcc/stage/delete/UniqueCleanup.java        |  12 +-
 .../mvcc/stage/write/WriteCommit.java           |  10 +-
 .../scheduler/CollectionExecutorScheduler.java  |  52 ++++++++
 .../scheduler/CollectionSchedulerFig.java       |  53 ++++++++
 .../collection/EntityCollectionManagerIT.java   |  68 ++++++++++
 .../mvcc/stage/delete/MarkCommitTest.java       |   2 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   2 +-
 .../core/executor/TaskExecutorFactory.java      | 108 +++++++++++----
 .../persistence/core/guice/CommonModule.java    |  25 ++--
 .../persistence/core/rx/RxSchedulerFig.java     |  71 ----------
 .../core/rx/RxTaskSchedulerImpl.java            |  81 +-----------
 .../index/impl/DeIndexOperation.java            |   4 +-
 .../persistence/index/impl/IndexingUtils.java   |  21 +++
 .../index/impl/IndexingUtilsTest.java           |  36 +++++
 .../usergrid/services/AbstractService.java      |  13 +-
 29 files changed, 815 insertions(+), 284 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ae69b6f,5d2d8dc..aacf6e9
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@@ -16,7 -16,8 +16,9 @@@
  package org.apache.usergrid.corepersistence;
  
  
 +import org.apache.usergrid.persistence.cache.guice.CacheModule;
+ import java.util.concurrent.ThreadPoolExecutor;
+ 
  import org.safehaus.guicyfig.GuicyFigModule;
  
  import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 90840af,179b3c4..adaed0f
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@@ -223,44 -221,46 +223,61 @@@ public class IndexingUtils 
      }
  
  
 -    /**
 -     * Parse the document id into a candidate result
 -     */
 -    public static CandidateResult parseIndexDocId( final String documentId ) {
 +    public static CandidateResult parseIndexDocId( final SearchHit hit ) {
 +        return parseIndexDocId(hit.getId());
 +    }
 +
 +    public static CandidateResult parseIndexDocId( final SearchHit hit, boolean isGeo ) {
  
 +        final String documentId = hit.getId();
 +        final double distance = isGeo ? (double) hit.sortValues()[0] : -1;
 +        return parseIndexDocId(documentId,distance);
 +    }
 +
 +    public static CandidateResult parseIndexDocId( final String documentId ) {
 +        return parseIndexDocId(documentId,-1);
 +    }
 +        /**
 +         * Parse the document id into a candidate result
 +         */
 +    public static CandidateResult parseIndexDocId( final String documentId, final double distance ) {
  
 -        final Matcher matcher = DOCUMENT_PATTERN.matcher( documentId );
 +        final Matcher matcher = DOCUMENT_PATTERN.matcher(documentId);
  
 -        Preconditions.checkArgument( matcher.matches(), "Pattern for document id did not match expected format" );
 -        Preconditions.checkArgument( matcher.groupCount() == 9, "9 groups expected in the pattern" );
 +        Preconditions.checkArgument(matcher.matches(), "Pattern for document id did not match expected format");
 +        Preconditions.checkArgument(matcher.groupCount() == 9, "9 groups expected in the pattern");
  
          //Other fields can be parsed using groups.  The groups start at value 1, group 0 is the entire match
 -        final String entityUUID = matcher.group( 3 );
 -        final String entityType = matcher.group( 4 );
 +        final String entityUUID = matcher.group(3);
 +        final String entityType = matcher.group(4);
  
 -        final String versionUUID = matcher.group( 5 );
 +        final String versionUUID = matcher.group(5);
  
  
 -        Id entityId = new SimpleId( UUID.fromString( entityUUID ), entityType );
 +        Id entityId = new SimpleId(UUID.fromString(entityUUID), entityType);
  
 -        return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId );
 +        return distance >= 0
 +            ? new GeoCandidateResult(entityId, UUID.fromString(versionUUID), documentId, distance)
 +            : new CandidateResult(entityId, UUID.fromString(versionUUID), documentId);
      }
  
+     /**
+      * Parse the document id into a candidate result
+      */
+     public static UUID parseAppIdFromIndexDocId( final String documentId) {
+ 
+         final Matcher matcher = DOCUMENT_PATTERN.matcher(documentId);
+ 
+         Preconditions.checkArgument(matcher.matches(), "Pattern for document id did not match expected format");
+         Preconditions.checkArgument(matcher.groupCount() == 9, "9 groups expected in the pattern");
+ 
+         //Other fields can be parsed using groups.  The groups start at value 1, group 0 is the entire match
+         final String appUUID = matcher.group(1);
+ 
+         return UUID.fromString(appUUID);
+ 
+     }
+ 
  
      /**
       * Get the entity type

http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index e7d4fc4,3887f92..88d87fe
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@@ -27,7 -27,9 +27,10 @@@ import java.util.Set
  import java.util.UUID;
  
  import com.codahale.metrics.Timer;
 +import org.apache.usergrid.persistence.cache.CacheFactory;
+ 
+ import org.apache.usergrid.corepersistence.rx.impl.ResponseImportTasks;
+ import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
  import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
  import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
  import org.slf4j.Logger;


[13/15] usergrid git commit: Merge commit 'refs/pull/433/head' of github.com:apache/usergrid into 2.1-release

Posted by to...@apache.org.
Merge commit 'refs/pull/433/head' of github.com:apache/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/63f49bac
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/63f49bac
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/63f49bac

Branch: refs/heads/master
Commit: 63f49bac58395b8fc9b36fedd59dae297e63459f
Parents: b895643 f840623
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Nov 12 13:19:54 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Nov 12 13:19:54 2015 -0800

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    | 134 ++++++++++++++-----
 .../asyncevents/AmazonAsyncEventService.java    |   1 +
 .../asyncevents/AsyncEventsSchedulerFig.java    |  94 +++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   2 +-
 .../asyncevents/EventExecutionScheduler.java    |  37 +++++
 .../traverse/ReadGraphCollectionFilter.java     |   3 +-
 .../traverse/ReadGraphConnectionFilter.java     |   3 +-
 .../corepersistence/rx/impl/AsyncRepair.java    |  38 ++++++
 .../rx/impl/ResponseImportTasks.java            |  38 ++++++
 .../service/ServiceSchedulerFig.java            |  66 +++++++++
 .../collection/guice/CollectionModule.java      |  32 +++++
 .../guice/CollectionTaskExecutor.java           |  35 -----
 .../EntityCollectionManagerFactoryImpl.java     |   5 +-
 .../impl/EntityCollectionManagerImpl.java       |  17 ++-
 .../mvcc/stage/delete/UniqueCleanup.java        |  12 +-
 .../mvcc/stage/write/WriteCommit.java           |  10 +-
 .../scheduler/CollectionExecutorScheduler.java  |  52 +++++++
 .../scheduler/CollectionSchedulerFig.java       |  53 ++++++++
 .../collection/EntityCollectionManagerIT.java   |  68 ++++++++++
 .../mvcc/stage/delete/MarkCommitTest.java       |   2 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   2 +-
 .../core/executor/TaskExecutorFactory.java      | 108 +++++++++++----
 .../persistence/core/guice/CommonModule.java    |  25 ++--
 .../persistence/core/rx/RxSchedulerFig.java     |  71 ----------
 .../core/rx/RxTaskSchedulerImpl.java            |  81 +----------
 .../usergrid/services/AbstractService.java      |  13 +-
 26 files changed, 724 insertions(+), 278 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/63f49bac/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------


[05/15] usergrid git commit: fix alias issues

Posted by to...@apache.org.
fix alias issues


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fec6520e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fec6520e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fec6520e

Branch: refs/heads/master
Commit: fec6520eb47fa6fe7bc1d8d7fad6a729874f3475
Parents: 204bf04
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 11:07:29 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 11:47:08 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 15 ++++++++
 .../index/impl/DeIndexOperation.java            |  4 +--
 .../persistence/index/impl/IndexingUtils.java   | 21 ++++++++++++
 .../index/impl/IndexingUtilsTest.java           | 36 ++++++++++++++++++++
 4 files changed, 74 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 16e119c..77777c2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -550,6 +551,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
 
+        checkInitialize(indexOperationMessage);
 
         //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
         //so we'll let compaction on column expiration handle deletion
@@ -565,6 +567,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     }
 
+    private void checkInitialize(final IndexOperationMessage indexOperationMessage) {
+        indexOperationMessage.getIndexRequests().stream().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+            entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+        });
+
+        indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+            entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+        });
+    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index dbecf8a..aefceda 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -44,10 +44,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd
 public class DeIndexOperation implements BatchOperation {
 
     @JsonProperty
-    private String[] indexes;
+    public String[] indexes;
 
     @JsonProperty
-    private String documentId;
+    public String documentId;
 
 
     public DeIndexOperation() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 18cb928..179b3c4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -244,6 +244,23 @@ public class IndexingUtils {
         return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId );
     }
 
+    /**
+     * Parse the document id into a candidate result
+     */
+    public static UUID parseAppIdFromIndexDocId( final String documentId) {
+
+        final Matcher matcher = DOCUMENT_PATTERN.matcher(documentId);
+
+        Preconditions.checkArgument(matcher.matches(), "Pattern for document id did not match expected format");
+        Preconditions.checkArgument(matcher.groupCount() == 9, "9 groups expected in the pattern");
+
+        //Other fields can be parsed using groups.  The groups start at value 1, group 0 is the entire match
+        final String appUUID = matcher.group(1);
+
+        return UUID.fromString(appUUID);
+
+    }
+
 
     /**
      * Get the entity type
@@ -262,4 +279,8 @@ public class IndexingUtils {
         sb.append( ENTITY_TYPE_NAME).append("(" ).append( type ).append( ")" );
         return sb.toString();
     }
+
+    public static UUID getApplicationIdFromIndexDocId(String documentId) {
+        return parseAppIdFromIndexDocId(documentId);
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
index 97389b2..d93f8a3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.parseAppIdFromIndexDocId;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.parseIndexDocId;
 import static org.junit.Assert.assertEquals;
 
@@ -91,6 +92,41 @@ public class IndexingUtilsTest {
 
 
     @Test
+    public void testAppIdFromDocumentId() {
+
+        final ApplicationScopeImpl applicationScope = new ApplicationScopeImpl( new SimpleId( "application" ) );
+
+        final Id id = new SimpleId( "id" );
+        final UUID version = UUIDGenerator.newTimeUUID();
+
+        final SearchEdgeImpl searchEdge =
+            new SearchEdgeImpl( new SimpleId( "source" ), "users", SearchEdge.NodeType.TARGET );
+
+        final String output = IndexingUtils.createIndexDocId( applicationScope, id, version, searchEdge );
+
+
+        final String expected =
+            "appId(" + applicationScope.getApplication().getUuid() + ",application).entityId(" + id.getUuid() + "," + id
+                .getType() + ").version(" + version + ").nodeId(" + searchEdge.getNodeId().getUuid() + "," + searchEdge
+                .getNodeId().getType() + ").edgeName(users).nodeType(TARGET)";
+
+
+        assertEquals( output, expected );
+
+
+        //now parse it
+
+        final CandidateResult parsedId = parseIndexDocId( output );
+
+        assertEquals(version, parsedId.getVersion());
+        assertEquals(id, parsedId.getId());
+
+        final UUID appId = parseAppIdFromIndexDocId(output);
+        assertEquals(appId,applicationScope.getApplication().getUuid());
+    }
+
+
+    @Test
     public void testDocumentIdPipes() {
 
         final ApplicationScopeImpl applicationScope = new ApplicationScopeImpl( new SimpleId( "application" ) );


[04/15] usergrid git commit: Refactored schedulers to have separate schedulers for different tasks

Posted by to...@apache.org.
Refactored schedulers to have separate schedulers for different tasks

Also fixes a bug with unique values.  Values are now validated on read to ensure that unique value is still valid.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0e1f0e64
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0e1f0e64
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0e1f0e64

Branch: refs/heads/master
Commit: 0e1f0e64176ce13b579762c42d8358eaa7543f20
Parents: 7da99c7
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 11 16:55:00 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:27:16 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    | 130 ++++++++++++++-----
 .../asyncevents/AmazonAsyncEventService.java    |   1 +
 .../asyncevents/AsyncEventsSchedulerFig.java    |  94 ++++++++++++++
 .../asyncevents/AsyncIndexProvider.java         |   2 +-
 .../asyncevents/EventExecutionScheduler.java    |  37 ++++++
 .../traverse/ReadGraphCollectionFilter.java     |   3 +-
 .../traverse/ReadGraphConnectionFilter.java     |   3 +-
 .../corepersistence/rx/impl/AsyncRepair.java    |  38 ++++++
 .../corepersistence/rx/impl/ImportRepair.java   |  38 ++++++
 .../service/ServiceSchedulerFig.java            |  48 +++++++
 .../collection/guice/CollectionModule.java      |  30 +++++
 .../guice/CollectionTaskExecutor.java           |  35 -----
 .../EntityCollectionManagerFactoryImpl.java     |   5 +-
 .../impl/EntityCollectionManagerImpl.java       |  16 ++-
 .../mvcc/stage/write/WriteCommit.java           |   4 +-
 .../scheduler/CollectionExecutorScheduler.java  |  52 ++++++++
 .../scheduler/CollectionSchedulerFig.java       |  53 ++++++++
 .../collection/EntityCollectionManagerIT.java   |  53 +++++---
 .../core/executor/TaskExecutorFactory.java      | 101 ++++++++++----
 .../persistence/core/guice/CommonModule.java    |  25 ++--
 .../persistence/core/rx/RxSchedulerFig.java     |  71 ----------
 .../core/rx/RxTaskSchedulerImpl.java            |  81 +-----------
 .../usergrid/services/AbstractService.java      |  12 +-
 23 files changed, 639 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 959edec..09db151 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,12 +16,16 @@
 package org.apache.usergrid.corepersistence;
 
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventsSchedulerFig;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventExecutionScheduler;
 import org.apache.usergrid.corepersistence.index.ApplicationIndexBucketLocator;
 import org.apache.usergrid.corepersistence.index.CoreIndexFig;
 import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -42,6 +46,8 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
 import org.apache.usergrid.corepersistence.service.AggregationService;
 import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
 import org.apache.usergrid.corepersistence.service.AggregationServiceImpl;
@@ -51,20 +57,26 @@ import org.apache.usergrid.corepersistence.service.CollectionService;
 import org.apache.usergrid.corepersistence.service.CollectionServiceImpl;
 import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
 import org.apache.usergrid.corepersistence.service.StatusService;
 import org.apache.usergrid.corepersistence.service.StatusServiceImpl;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
@@ -73,25 +85,22 @@ import com.google.inject.multibindings.Multibinder;
 /**
  * Guice Module that encapsulates Core Persistence.
  */
-public class CoreModule  extends AbstractModule {
-
-
+public class CoreModule extends AbstractModule {
 
 
     @Override
     protected void configure() {
 
-        install( new CommonModule());
+        install( new CommonModule() );
         install( new CollectionModule() {
             /**
              * configure our migration data provider for all entities in the system
              */
             @Override
-           public void configureMigrationProvider() {
+            public void configureMigrationProvider() {
 
-                bind(new TypeLiteral<MigrationDataProvider<EntityIdScope>>(){}).to(
-                    AllEntitiesInSystemImpl.class );
-           }
+                bind( new TypeLiteral<MigrationDataProvider<EntityIdScope>>() {} ).to( AllEntitiesInSystemImpl.class );
+            }
         } );
         install( new GraphModule() {
 
@@ -100,30 +109,28 @@ public class CoreModule  extends AbstractModule {
              */
             @Override
             public void configureMigrationProvider() {
-                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
-                    AllNodesInGraphImpl.class );
+                bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to( AllNodesInGraphImpl.class );
             }
         } );
-        install(new IndexModule(){
+        install( new IndexModule() {
             @Override
             public void configureMigrationProvider() {
-                bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ).to(
-                    AllApplicationsObservableImpl.class );
+                bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} )
+                    .to( AllApplicationsObservableImpl.class );
             }
-        });
-       //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
-       //        install(new QueueModule());
+        } );
+        //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
+        //        install(new QueueModule());
 
-        bind(ManagerCache.class).to( CpManagerCache.class );
-        bind(ApplicationIdCacheFactory.class);
+        bind( ManagerCache.class ).to( CpManagerCache.class );
+        bind( ApplicationIdCacheFactory.class );
 
 
         /**
          * Create our migrations for within our core plugin
          */
         Multibinder<DataMigration> dataMigrationMultibinder =
-                    Multibinder.newSetBinder( binder(),
-                        new TypeLiteral<DataMigration>() {}, CoreMigration.class );
+            Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration>() {}, CoreMigration.class );
 
 
         dataMigrationMultibinder.addBinding().to( DeDupConnectionDataMigration.class );
@@ -135,7 +142,7 @@ public class CoreModule  extends AbstractModule {
         plugins.addBinding().to( MigrationModuleVersionPlugin.class );
 
         bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class );
-        bind( AllEntityIdsObservable.class).to( AllEntityIdsObservableImpl.class );
+        bind( AllEntityIdsObservable.class ).to( AllEntityIdsObservableImpl.class );
 
 
         /*****
@@ -143,50 +150,103 @@ public class CoreModule  extends AbstractModule {
          *****/
 
 
-        bind( IndexService.class ).to(IndexServiceImpl.class);
+        bind( IndexService.class ).to( IndexServiceImpl.class );
 
         //bind the event handlers
-        bind( EventBuilder.class).to( EventBuilderImpl.class );
-        bind(ApplicationIndexBucketLocator.class);
+        bind( EventBuilder.class ).to( EventBuilderImpl.class );
+        bind( ApplicationIndexBucketLocator.class );
 
         //bind the queue provider
         bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
 
 
-        bind( ReIndexService.class).to(ReIndexServiceImpl.class);
+        bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
 
-        install(new FactoryModuleBuilder()
-            .implement(AggregationService.class, AggregationServiceImpl.class)
-            .build(AggregationServiceFactory.class));
+        install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
+                                           .build( AggregationServiceFactory.class ) );
 
-        bind(IndexLocationStrategyFactory.class).to( IndexLocationStrategyFactoryImpl.class );
+        bind( IndexLocationStrategyFactory.class ).to( IndexLocationStrategyFactoryImpl.class );
 
-        install(new GuicyFigModule(IndexProcessorFig.class));
-
-        install(new GuicyFigModule(CoreIndexFig.class));
+        install( new GuicyFigModule( IndexProcessorFig.class ) );
 
+        install( new GuicyFigModule( CoreIndexFig.class ) );
 
 
         install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
 
         install( new GuicyFigModule( EntityManagerFig.class ) );
 
+        install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
+
+        install( new GuicyFigModule( ServiceSchedulerFig.class ) );
+
         //install our pipeline modules
-        install(new PipelineModule());
+        install( new PipelineModule() );
 
         /**
          * Install our service operations
          */
 
-        bind( CollectionService.class).to( CollectionServiceImpl.class );
+        bind( CollectionService.class ).to( CollectionServiceImpl.class );
 
-        bind( ConnectionService.class).to( ConnectionServiceImpl.class);
+        bind( ConnectionService.class ).to( ConnectionServiceImpl.class );
 
         bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
 
         bind( StatusService.class ).to( StatusServiceImpl.class );
+    }
+
+
+    @Provides
+    @Inject
+    @EventExecutionScheduler
+    public RxTaskScheduler getSqsTaskScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+        final String poolName = asyncEventsSchedulerFig.getIoSchedulerName();
+        final int threadCount = asyncEventsSchedulerFig.getMaxIoThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory
+            .createTaskExecutor( poolName, threadCount, threadCount, TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
 
+        return taskScheduler;
     }
 
+
+    @Provides
+    @Inject
+    @AsyncRepair
+    public RxTaskScheduler getAsyncRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+        final String poolName = asyncEventsSchedulerFig.getRepairPoolName();
+        final int threadCount = asyncEventsSchedulerFig.getMaxRepairThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory
+            .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.DROP );
+
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+        return taskScheduler;
+    }
+
+
+    @Provides
+    @Inject
+    @ImportRepair
+    public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+        final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
+        final int threadCount = asyncEventsSchedulerFig.getMaxImportThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory
+            .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+        return taskScheduler;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 16e119c..24ec51f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -153,6 +153,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     final EventBuilder eventBuilder,
                                     final MapManagerFactory mapManagerFactory,
                                     final QueueFig queueFig,
+                                    @EventExecutionScheduler
                                     final RxTaskScheduler rxTaskScheduler ) {
         this.indexProducer = indexProducer;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
new file mode 100644
index 0000000..83eb02e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface AsyncEventsSchedulerFig extends GuicyFig {
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String REPAIR_SCHEDULER_THREADS = "repair.io.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String REPAIR_SCHEDULER_NAME = "repair.io.poolName";
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String IMPORT_SCHEDULER_THREADS = "import.io.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String IMPORT_SCHEDULER_NAME = "import.io.poolName";
+
+
+    @Default( "100" )
+    @Key( IO_SCHEDULER_THREADS )
+    int getMaxIoThreads();
+
+    @Default( "Usergrid-SQS-Pool" )
+    @Key( IO_SCHEDULER_NAME )
+    String getIoSchedulerName();
+
+
+    @Default( "20" )
+    @Key( REPAIR_SCHEDULER_THREADS )
+    int getMaxRepairThreads();
+
+    @Default( "Usergrid-Repair-Pool" )
+    @Key( REPAIR_SCHEDULER_NAME )
+    String getRepairPoolName();
+
+    @Default( "100" )
+    @Key( IMPORT_SCHEDULER_THREADS )
+    int getMaxImportThreads();
+
+    @Default( "Usergrid-Import-Pool" )
+    @Key( IMPORT_SCHEDULER_NAME )
+    String getImportSchedulerName();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2bace8d..d65cffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -62,7 +62,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
                               final QueueManagerFactory queueManagerFactory,
                               final MetricsFactory metricsFactory,
-                              final RxTaskScheduler rxTaskScheduler,
+                              @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler,
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EventBuilder eventBuilder,
                               final IndexLocationStrategyFactory indexLocationStrategyFactory,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
new file mode 100644
index 0000000..ce09aae
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the event execution scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface EventExecutionScheduler {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 3d7df3b..3819659 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -44,7 +45,7 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
      */
     @Inject
     public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
-                                      final RxTaskScheduler rxTaskScheduler,
+                                      @AsyncRepair final RxTaskScheduler rxTaskScheduler,
                                       final EventBuilder eventBuilder,
                                       final AsyncEventService asyncEventService,
                                       @Assisted final String collectionName ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index b2d368b..3c92c03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 
@@ -44,7 +45,7 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
      */
     @Inject
     public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
-                                      final RxTaskScheduler rxTaskScheduler,
+                                      @AsyncRepair final RxTaskScheduler rxTaskScheduler,
                                       final EventBuilder eventBuilder,
                                       final AsyncEventService asyncEventService,
                                       @Assisted final String connectionName ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
new file mode 100644
index 0000000..aa2cc12
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface AsyncRepair {
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
new file mode 100644
index 0000000..d65d04c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ImportRepair {
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
new file mode 100644
index 0000000..ddaa01c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface ServiceSchedulerFig extends GuicyFig {
+
+
+    /**
+     * The number of threads to use when importing entities into result sets
+     */
+    String SERVICE_IMPORT_THREADS = "service.import.threads";
+
+
+
+    @Default("20")
+    @Key( SERVICE_IMPORT_THREADS)
+    int getImportThreads();
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 78c7f37..0a6e270 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.collection.guice;
 
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -25,11 +27,18 @@ import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
 import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionSchedulerFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
 
 
 /**
@@ -45,6 +54,7 @@ public abstract class CollectionModule extends AbstractModule {
 
         // noinspection unchecked
         install( new GuicyFigModule( SerializationFig.class ) );
+        install( new GuicyFigModule( CollectionSchedulerFig.class ) );
         install( new SerializationModule() );
         install( new ServiceModule() );
 
@@ -62,6 +72,26 @@ public abstract class CollectionModule extends AbstractModule {
     }
 
 
+
+
+    @Provides
+    @Inject
+    @CollectionExecutorScheduler
+    public RxTaskScheduler getRxTaskScheduler( final CollectionSchedulerFig collectionSchedulerFig ){
+
+        final String poolName = collectionSchedulerFig.getIoSchedulerName();
+        final int threadCount = collectionSchedulerFig.getMaxIoThreads();
+
+
+        final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, threadCount,
+            TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+        final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl(executor  );
+
+        return taskScheduler;
+    }
+
+
     /**
      * Gives callers the ability to to configure an instance of
      *

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
deleted file mode 100644
index 53c1f48..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface CollectionTaskExecutor {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 45cee06..a52ee9c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
 import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
@@ -74,7 +75,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
     private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
     private final Keyspace keyspace;
-    private final EntityCacheFig entityCacheFig;
     private final MetricsFactory metricsFactory;
     private final RxTaskScheduler rxTaskScheduler;
 
@@ -107,7 +107,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
                                                final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                                final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
                                                final Keyspace keyspace, final EntityCacheFig entityCacheFig,
-                                               MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
+                                               final MetricsFactory metricsFactory, @CollectionExecutorScheduler  final RxTaskScheduler rxTaskScheduler ) {
 
         this.writeStart = writeStart;
         this.writeVerifyUnique = writeVerifyUnique;
@@ -123,7 +123,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
         this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
         this.keyspace = keyspace;
-        this.entityCacheFig = entityCacheFig;
         this.metricsFactory = metricsFactory;
         this.rxTaskScheduler = rxTaskScheduler;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index cb1515c..d6bbdc5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.codahale.metrics.Timer;
@@ -117,7 +118,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     private final Timer deleteTimer;
     private final Timer fieldIdTimer;
     private final Timer fieldEntityTimer;
-    private final Timer updateTimer;
     private final Timer loadTimer;
     private final Timer getLatestTimer;
 
@@ -165,7 +165,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.delete");
         this.fieldIdTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldId");
         this.fieldEntityTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldEntity");
-        this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.update");
         this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.load");
         this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.latest");
     }
@@ -188,8 +187,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
         Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
 
 
-        final Observable<Entity> write = observable.map( writeCommit ).compose( uniqueCleanup )
-                                                                              //now extract the ioEvent we need to return
+        final Observable<Entity> write = observable.map( writeCommit )
+                                                   .map(ioEvent -> {
+                //fire this in the background so we don't block writes
+                Observable.just( ioEvent ).compose( uniqueCleanup ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+                return ioEvent;
+            }
+         )
+                                                                              //now extract the ioEvent we need to return and update the version
                                                                               .map( ioEvent -> ioEvent.getEvent().getEntity().get() );
 
         return ObservableTimer.time( write, writeTimer );
@@ -358,7 +363,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                         continue;
                     }
 
-
                     //else add it to our result set
                     response.addEntity( expectedUnique.getField(), entity );
                 }
@@ -380,6 +384,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
     }
 
 
+
+
     // fire the stages
     public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
                                                                   WriteStart writeState ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 9b1a393..fe3f9a9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -94,7 +94,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         final ApplicationScope applicationScope = ioEvent.getEntityCollection();
 
         //set the version into the entity
-        EntityUtils.setVersion( mvccEntity.getEntity().get(), version );
+        final Entity entity = mvccEntity.getEntity().get();
+
+        EntityUtils.setVersion( entity, version );
 
         MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() );
         ValidationUtils.verifyTimeUuid( version ,"version" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
new file mode 100644
index 0000000..8f8aa00
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CollectionExecutorScheduler {}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
new file mode 100644
index 0000000..daefa9b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface CollectionSchedulerFig extends GuicyFig {
+
+
+    /**
+     * Amount of threads to use in async processing
+     */
+    String COLLECTION_SCHEDULER_THREADS = "scheduler.collection.threads";
+
+
+    /**
+     * Name of pool to use when performing scheduling
+     */
+    String COLLECTION_SCHEDULER_NAME = "scheduler.collection.poolName";
+
+
+    @Default( "20" )
+    @Key( COLLECTION_SCHEDULER_THREADS )
+    int getMaxIoThreads();
+
+    @Default( "Usergrid-Collection-Pool" )
+    @Key( COLLECTION_SCHEDULER_NAME )
+    String getIoSchedulerName();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index e6c6909..115be99 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -58,6 +59,7 @@ import rx.Observable;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -315,18 +317,17 @@ public class EntityCollectionManagerIT {
 
 
     @Test
-    public void writeAndGetField2X() {
-
-
+    public void writeAndGetField2X() throws InterruptedException {
         ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
 
-        Entity newEntity = new Entity( new SimpleId( "test" ) );
-        Field field = new StringField( "testField", "unique", true );
-        newEntity.setField( field );
+        final Id entityId = new SimpleId( "test" );
+        Entity firstInstance = new Entity( entityId  );
+        Field firstField = new StringField( "testField", "unique", true );
+        firstInstance.setField( firstField );
 
         EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
 
-        Observable<Entity> observable = manager.write( newEntity );
+        Observable<Entity> observable = manager.write( firstInstance );
 
         Entity createReturned = observable.toBlocking().lastOrDefault( null );
 
@@ -334,21 +335,22 @@ public class EntityCollectionManagerIT {
         assertNotNull( "Id was assigned", createReturned.getId() );
         assertNotNull( "Version was assigned", createReturned.getVersion() );
 
-        Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
-        assertNotNull( id );
-        assertEquals( newEntity.getId(), id );
+        final Id existingId = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+        assertNotNull( existingId );
+        assertEquals( firstInstance.getId(), existingId );
 
         Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
-        id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
-        assertNull( id );
+        final Id noId = manager.getIdField( firstInstance.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+        assertNull( noId );
 
 
         //ensure we clean up
 
-        Field fieldSecond = new StringField( "testField", "unique2", true );
-        newEntity.setField( fieldSecond );
+        Entity secondInstance = new Entity( entityId  );
+        Field secondField = new StringField( firstField.getName(), "unique2", true );
+        secondInstance.setField( secondField );
 
-        Observable<Entity> observableSecond = manager.write( newEntity );
+        Observable<Entity> observableSecond = manager.write( secondInstance );
 
         Entity createReturnedSecond = observableSecond.toBlocking().lastOrDefault( null );
 
@@ -356,16 +358,27 @@ public class EntityCollectionManagerIT {
         assertNotNull( "Id was assigned", createReturnedSecond.getId() );
         assertNotNull( "Version was assigned", createReturnedSecond.getVersion() );
 
-        Id idFirst = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+        assertNotEquals( "Versions should not be equal", createReturned.getVersion(), createReturnedSecond.getVersion() );
 
-        assertNull(idFirst);
+        //sanity check, get the entity to ensure it's the right version
 
-        Id idSecond = manager.getIdField( newEntity.getId().getType(), fieldSecond ).toBlocking().lastOrDefault( null );
+        final Entity loadedVersion = manager.load( entityId ).toBlocking().last();
 
-        assertNotNull( idSecond );
-        assertEquals( newEntity.getId(), idSecond );
+        assertEquals(entityId, loadedVersion.getId());
+        assertEquals(createReturnedSecond.getVersion(), loadedVersion.getVersion());
 
+        //give clean time to run.  need to finish the todo below
+        Thread.sleep( 2000 );
 
+        //TODO, we need to implement verify and repair on this
+        final Id idFirst = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+        assertNull(idFirst);
+
+
+        final Id idSecond = manager.getIdField( secondInstance.getId().getType(), secondField ).toBlocking().lastOrDefault( null );
+
+        assertNotNull( idSecond );
+        assertEquals( secondInstance.getId(), idSecond );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index 3c6a750..bd3d3e9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,30 +20,45 @@
 package org.apache.usergrid.persistence.core.executor;
 
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 /**
  * A task executor that allows you to submit tasks
  */
 public class TaskExecutorFactory {
 
-    private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
+    private static final Logger log = LoggerFactory.getLogger( TaskExecutorFactory.class );
+
 
     public enum RejectionAction {
+        /**
+         * If there is no capacity left, throw an exception
+         */
         ABORT,
-        CALLERRUNS
+        /**
+         * If there is no capacity left, the caller runs the callable
+         */
+        CALLERRUNS,
+
+        /**
+         * If there is no capacity left, the request is logged and then silently dropped
+         */
+        DROP
     }
+
+
     /**
      * Create a task executor
-     * @param schedulerName
-     * @param maxThreadCount
-     * @param maxQueueSize
-     * @return
      */
     public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
                                                          final int maxQueueSize, RejectionAction rejectionAction ) {
@@ -52,22 +67,22 @@ public class TaskExecutorFactory {
         final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
 
 
-        if(rejectionAction.equals(RejectionAction.ABORT)){
-
+        if ( rejectionAction == RejectionAction.ABORT ) {
             return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
-
         }
-        else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){
+        else if ( rejectionAction == RejectionAction.CALLERRUNS ) {
 
             return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );
-
-        }else{
-            //default to the thread pool with ABORT policy
-            return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
         }
-
+        else if ( rejectionAction == RejectionAction.DROP ) {
+            return new MaxSizeThreadPoolDrops( queue, schedulerName, maxThreadCount );
+        }
+        else {
+            throw new IllegalArgumentException( "Unable to create a scheduler with the arguments provided" );
+        }
     }
 
+
     /**
      * Create a thread pool that will reject work if our audit tasks become overwhelmed
      */
@@ -78,14 +93,29 @@ public class TaskExecutorFactory {
         }
     }
 
+
     /**
      * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
      */
     private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {
 
-        public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
-            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
-                new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
+        public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName,
+                                            final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+                new CallerRunsHandler( poolName ) );
+        }
+    }
+
+
+    /**
+     * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
+     */
+    private static final class MaxSizeThreadPoolDrops extends ThreadPoolExecutor {
+
+        public MaxSizeThreadPoolDrops( final BlockingQueue<Runnable> queue, final String poolName,
+                                       final int maxPoolSize ) {
+            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+                new DropHandler( poolName ) );
         }
     }
 
@@ -111,29 +141,50 @@ public class TaskExecutorFactory {
             Thread t = new Thread( r, threadName );
 
             //set it to be a daemon thread so it doesn't block shutdown
-            t.setDaemon(true);
+            t.setDaemon( true );
 
             return t;
         }
     }
 
+
     /**
      * The handler that will handle rejected executions and signal the interface
      */
-    private static final class RejectedHandler implements RejectedExecutionHandler {
+    private static final class CallerRunsHandler implements RejectedExecutionHandler {
 
         private final String poolName;
 
-        private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+        private CallerRunsHandler( final String poolName ) {this.poolName = poolName;}
+
 
         @Override
         public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r,
+                Thread.currentThread().getName() );
 
             //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
 
             r.run();
         }
+    }
+
+
+    /**
+     * The handler that will handle rejected executions and signal the interface
+     */
+    private static final class DropHandler implements RejectedExecutionHandler {
+
+        private final String poolName;
+
 
+        private DropHandler( final String poolName ) {this.poolName = poolName;}
+
+
+        @Override
+        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+            log.warn( "{} task queue full, dropping task {}", poolName, r );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index b93ba76..75e2b29 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,11 +19,6 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFig;
-import org.apache.usergrid.persistence.core.migration.data.*;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -32,14 +27,21 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFig;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCacheImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
 import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Key;
@@ -91,16 +93,11 @@ public class CommonModule extends AbstractModule {
         Multibinder.newSetBinder(binder(), MigrationPlugin.class);
 
 
-        /**
-         * RX java scheduler configuration
-         */
-
-        install(new GuicyFigModule(RxSchedulerFig.class));
 
         install(new GuicyFigModule(ClusterFig.class));
         bind(SettingsValidationCluster.class).asEagerSingleton(); //validate props from ClusterFig on startup
 
-        bind(RxTaskScheduler.class).to(RxTaskSchedulerImpl.class);
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
deleted file mode 100644
index 4511518..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- *
- */
-@FigSingleton
-public interface RxSchedulerFig extends GuicyFig {
-
-
-    /**
-     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
-     * backpressure
-     */
-    String IO_SCHEDULER_THREADS = "scheduler.io.threads";
-
-
-    /**
-     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
-     * backpressure
-     */
-    String IO_SCHEDULER_NAME = "scheduler.io.poolName";
-
-    /**
-     * The number of threads to use when importing entities into result sets
-     */
-    String IO_IMPORT_THREADS = "scheduler.import.threads";
-
-
-
-
-    @Default( "100" )
-    @Key( IO_SCHEDULER_THREADS )
-    int getMaxIoThreads();
-
-    @Default( "Usergrid-RxIOPool" )
-    @Key(IO_SCHEDULER_NAME)
-    String getIoSchedulerName();
-
-    @Default("20")
-    @Key( IO_IMPORT_THREADS)
-    int getImportThreads();
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
index dce46cb..261cbeb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -20,18 +20,9 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -42,29 +33,17 @@ import rx.schedulers.Schedulers;
 /**
  * An implementation of the task scheduler that allows us to control the number of I/O threads
  */
-@Singleton
 public class RxTaskSchedulerImpl implements RxTaskScheduler {
 
-    private static final Logger log = LoggerFactory.getLogger( RxTaskSchedulerImpl.class );
-
     private final Scheduler scheduler;
-    private final String poolName;
 
     @Inject
-    public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){
-
-        this.poolName = schedulerFig.getIoSchedulerName();
-
-        final int poolSize = schedulerFig.getMaxIoThreads();
-
-
-        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
-
+    public RxTaskSchedulerImpl(final ThreadPoolExecutor executor){
 
-        final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
+        Preconditions.checkNotNull( executor , "executor must not be null");
 
 
-        this.scheduler = Schedulers.from(threadPool);
+        this.scheduler = Schedulers.from(executor);
 
 
     }
@@ -76,56 +55,4 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler {
     }
 
 
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize ) {
-
-            super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),  new RejectedHandler() );
-        }
-    }
-
-
-    /**
-     * Thread factory that will name and count threads for easier debugging
-     */
-    private final class CountingThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            final String threadName = poolName + "-" + newValue;
-
-            Thread t = new Thread( r, threadName  );
-
-            //set it to be a daemon thread so it doesn't block shutdown
-            t.setDaemon( true );
-
-            return t;
-        }
-    }
-
-
-    /**
-     * The handler that will handle rejected executions and signal the interface
-     */
-    private final class RejectedHandler implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-            log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
-
-            //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
-
-            r.run();
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index d032589..662370f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -27,6 +27,10 @@ import java.util.Set;
 import java.util.UUID;
 
 import com.codahale.metrics.Timer;
+
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.slf4j.Logger;
@@ -42,7 +46,6 @@ import org.apache.usergrid.persistence.EntityRef;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.security.shiro.utils.SubjectUtils;
 import org.apache.usergrid.services.ServiceParameter.IdParameter;
@@ -54,6 +57,7 @@ import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
 import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
 
 import com.google.inject.Injector;
+import com.google.inject.Key;
 
 import rx.Observable;
 import rx.Scheduler;
@@ -100,7 +104,7 @@ public abstract class AbstractService implements Service {
     protected Map<String, Object> defaultEntityMetadata;
 
     private Scheduler rxScheduler;
-    private RxSchedulerFig rxSchedulerFig;
+    private ServiceSchedulerFig rxSchedulerFig;
     private MetricsFactory metricsFactory;
     private Timer entityGetTimer;
     private Timer entitiesGetTimer;
@@ -117,8 +121,8 @@ public abstract class AbstractService implements Service {
         this.sm = sm;
         em = sm.getEntityManager();
         final Injector injector = sm.getApplicationContext().getBean( Injector.class );
-        rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
-        rxSchedulerFig = injector.getInstance(RxSchedulerFig.class);
+        rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ImportRepair.class)).getAsyncIOScheduler();
+        rxSchedulerFig = injector.getInstance(ServiceSchedulerFig.class );
         metricsFactory = injector.getInstance(MetricsFactory.class);
         this.entityGetTimer = metricsFactory.getTimer(this.getClass(), "importEntity.get");
         this.entitiesGetTimer = metricsFactory.getTimer(this.getClass(), "importEntities.get");


[06/15] usergrid git commit: Addresses issues found in the review.

Posted by to...@apache.org.
Addresses issues found in the review.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7725d90e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7725d90e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7725d90e

Branch: refs/heads/master
Commit: 7725d90ec504f2002712636cfcad6a395de21226
Parents: 0e1f0e6
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 11:54:12 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:54:12 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    | 12 ++++---
 .../corepersistence/rx/impl/ImportRepair.java   | 38 --------------------
 .../rx/impl/ResponseImportTasks.java            | 38 ++++++++++++++++++++
 .../collection/guice/CollectionModule.java      |  4 ++-
 .../impl/EntityCollectionManagerImpl.java       |  3 ++
 .../core/executor/TaskExecutorFactory.java      |  9 ++++-
 .../usergrid/services/AbstractService.java      |  5 ++-
 7 files changed, 62 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 09db151..650bb4d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -47,7 +47,7 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
 import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
-import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ResponseImportTasks;
 import org.apache.usergrid.corepersistence.service.AggregationService;
 import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
 import org.apache.usergrid.corepersistence.service.AggregationServiceImpl;
@@ -77,6 +77,7 @@ import org.apache.usergrid.persistence.index.guice.IndexModule;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Provides;
+import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
 import com.google.inject.multibindings.Multibinder;
@@ -200,6 +201,7 @@ public class CoreModule extends AbstractModule {
     @Provides
     @Inject
     @EventExecutionScheduler
+    @Singleton
     public RxTaskScheduler getSqsTaskScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
 
         final String poolName = asyncEventsSchedulerFig.getIoSchedulerName();
@@ -218,6 +220,7 @@ public class CoreModule extends AbstractModule {
     @Provides
     @Inject
     @AsyncRepair
+    @Singleton
     public RxTaskScheduler getAsyncRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
 
         final String poolName = asyncEventsSchedulerFig.getRepairPoolName();
@@ -225,7 +228,7 @@ public class CoreModule extends AbstractModule {
 
 
         final ThreadPoolExecutor executor = TaskExecutorFactory
-            .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.DROP );
+            .createTaskExecutor( poolName, threadCount, 0, TaskExecutorFactory.RejectionAction.DROP );
 
         final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
 
@@ -235,7 +238,8 @@ public class CoreModule extends AbstractModule {
 
     @Provides
     @Inject
-    @ImportRepair
+    @ResponseImportTasks
+    @Singleton
     public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
 
         final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
@@ -243,7 +247,7 @@ public class CoreModule extends AbstractModule {
 
 
         final ThreadPoolExecutor executor = TaskExecutorFactory
-            .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+            .createTaskExecutor( poolName, threadCount, 0, TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
         final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
deleted file mode 100644
index d65d04c..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Label for using the async repair scheduler
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface ImportRepair {
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java
new file mode 100644
index 0000000..bf74d1f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ResponseImportTasks {
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 0a6e270..d788174 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
 import com.google.inject.AbstractModule;
 import com.google.inject.Inject;
 import com.google.inject.Provides;
+import com.google.inject.Singleton;
 
 
 /**
@@ -77,13 +78,14 @@ public abstract class CollectionModule extends AbstractModule {
     @Provides
     @Inject
     @CollectionExecutorScheduler
+    @Singleton
     public RxTaskScheduler getRxTaskScheduler( final CollectionSchedulerFig collectionSchedulerFig ){
 
         final String poolName = collectionSchedulerFig.getIoSchedulerName();
         final int threadCount = collectionSchedulerFig.getMaxIoThreads();
 
 
-        final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, threadCount,
+        final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, 0,
             TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
         final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl(executor  );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index d6bbdc5..8079ad9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -363,6 +363,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
                         continue;
                     }
 
+                    //TODO, we need to validate the property in the entity matches the property in the unique value
+
+
                     //else add it to our result set
                     response.addEntity( expectedUnique.getField(), entity );
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index bd3d3e9..4ffabf7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.core.executor;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -64,7 +65,13 @@ public class TaskExecutorFactory {
                                                          final int maxQueueSize, RejectionAction rejectionAction ) {
 
 
-        final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
+        final BlockingQueue<Runnable> queue;
+
+        if(maxQueueSize == 0){
+            queue = new SynchronousQueue();
+        }else{
+            queue = new ArrayBlockingQueue<>( maxQueueSize );
+        }
 
 
         if ( rejectionAction == RejectionAction.ABORT ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 662370f..85c973e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -28,8 +28,7 @@ import java.util.UUID;
 
 import com.codahale.metrics.Timer;
 
-import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
-import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ResponseImportTasks;
 import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
@@ -121,7 +120,7 @@ public abstract class AbstractService implements Service {
         this.sm = sm;
         em = sm.getEntityManager();
         final Injector injector = sm.getApplicationContext().getBean( Injector.class );
-        rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ImportRepair.class)).getAsyncIOScheduler();
+        rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ResponseImportTasks.class ) ).getAsyncIOScheduler();
         rxSchedulerFig = injector.getInstance(ServiceSchedulerFig.class );
         metricsFactory = injector.getInstance(MetricsFactory.class);
         this.entityGetTimer = metricsFactory.getTimer(this.getClass(), "importEntity.get");


[10/15] usergrid git commit: fix alias issues

Posted by to...@apache.org.
fix alias issues


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f13e45b1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f13e45b1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f13e45b1

Branch: refs/heads/master
Commit: f13e45b16561494d460a0f9c8592e319308be049
Parents: c068ab0
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 12:36:52 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 12:36:52 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java     | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f13e45b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e99c052..b5e77c1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -28,7 +28,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.usergrid.persistence.index.impl.IndexingUtils;
+import org.apache.usergrid.persistence.index.impl.*;
+import org.elasticsearch.action.index.IndexRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,8 +57,6 @@ import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -571,26 +570,26 @@ public class AmazonAsyncEventService implements AsyncEventService {
      * @param indexOperationMessage
      */
     private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
-        Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
+        final Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
         //loop through all adds
-        indexOperationMessage.getIndexRequests().stream().forEach(req -> {
-            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+        for(IndexOperation req : indexOperationMessage.getIndexRequests()) {
+            final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
             if(!apps.containsKey(appId)) {
                 ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
                 entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
                 apps.put(appId,true);
             }
-        });
+        };
 
         //loop through all deletes
-        indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
-            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+        for(DeIndexOperation req : indexOperationMessage.getDeIndexRequests()) {
+            final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
             if(!apps.containsKey(appId)) {
                 ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
                 entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
                 apps.put(appId,true);
             }
-        });
+        };
     }
 
 


[02/15] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/68d38f49
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/68d38f49
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/68d38f49

Branch: refs/heads/master
Commit: 68d38f496e8aeb6481cd6ea620c313fe46c40127
Parents: 419c013 c3a5bc4
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Nov 6 20:30:30 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Nov 6 20:30:30 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/usergrid/corepersistence/index/RxTest.java  | 5 +++--
 .../org/apache/usergrid/services/AbstractCollectionService.java | 4 +++-
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[03/15] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7da99c7f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7da99c7f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7da99c7f

Branch: refs/heads/master
Commit: 7da99c7fb69369bfd62cb0fe27f2eac74161f127
Parents: 68d38f4 204bf04
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 11 14:57:00 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 11 14:57:00 2015 -0700

----------------------------------------------------------------------
 .../shard/impl/NodeShardAllocationImpl.java     |   2 +-
 .../persistence/queue/DefaultQueueManager.java  |  58 +++++----
 .../java-wns/1.2-USERGRID/_remote.repositories  |   7 +
 ...a-wns-1.2-USERGRID-jar-with-dependencies.jar | Bin 0 -> 1836665 bytes
 .../java-wns-1.2-USERGRID-javadoc.jar           | Bin 0 -> 234289 bytes
 .../java-wns-1.2-USERGRID-sources.jar           | Bin 0 -> 21654 bytes
 .../1.2-USERGRID/java-wns-1.2-USERGRID.jar      | Bin 0 -> 33164 bytes
 .../1.2-USERGRID/java-wns-1.2-USERGRID.pom      | 128 +++++++++++++++++++
 stack/pom.xml                                   |  14 +-
 stack/rest/pom.xml                              |  10 ++
 stack/services/pom.xml                          |  10 +-
 .../services/notifications/wns/WNSAdapter.java  |   5 +-
 .../notifications/wns/WNSAdapterTest.java       |  49 +++++++
 13 files changed, 244 insertions(+), 39 deletions(-)
----------------------------------------------------------------------



[12/15] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a6936431
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a6936431
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a6936431

Branch: refs/heads/master
Commit: a6936431c98e05ec670aa9e4fdfacc63dae6544c
Parents: f840623 b895643
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 14:17:06 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 14:17:06 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 38 +++++++++++++++++---
 .../index/impl/DeIndexOperation.java            |  4 +--
 .../persistence/index/impl/IndexingUtils.java   | 21 +++++++++++
 .../index/impl/IndexingUtilsTest.java           | 36 +++++++++++++++++++
 4 files changed, 92 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a6936431/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------


[11/15] usergrid git commit: Call foreach on the set directly and only call createEntityIndex on a unique set of appIds.

Posted by to...@apache.org.
Call foreach on the set directly and only call createEntityIndex on a unique set of appIds.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b8956439
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b8956439
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b8956439

Branch: refs/heads/master
Commit: b89564398e1094203d224a1586103fe9298d12d9
Parents: f13e45b
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Nov 12 11:39:17 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Nov 12 11:39:17 2015 -0800

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b8956439/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b5e77c1..e3b60a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -570,26 +570,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
      * @param indexOperationMessage
      */
     private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
-        final Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
-        //loop through all adds
-        for(IndexOperation req : indexOperationMessage.getIndexRequests()) {
-            final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            if(!apps.containsKey(appId)) {
-                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
-                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
-                apps.put(appId,true);
-            }
-        };
 
-        //loop through all deletes
-        for(DeIndexOperation req : indexOperationMessage.getDeIndexRequests()) {
-            final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            if(!apps.containsKey(appId)) {
+        // create a set so we can have a unique list of appIds for which we call createEntityIndex
+        Set<UUID> appIds = new HashSet<>();
+
+        // loop through all indexRequests and add the appIds to the set
+        indexOperationMessage.getIndexRequests().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            appIds.add(appId);
+        });
+
+        // loop through all deindexRequests and add the appIds to the set
+        indexOperationMessage.getDeIndexRequests().forEach(req -> {
+            UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+            appIds.add(appId);
+        });
+
+        // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
+        appIds.forEach(appId -> {
                 ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
                 entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
-                apps.put(appId,true);
             }
-        };
+        );
     }
 
 


[09/15] usergrid git commit: fix alias issues

Posted by to...@apache.org.
fix alias issues


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c068ab0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c068ab0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c068ab0d

Branch: refs/heads/master
Commit: c068ab0d471bf8bf86dff2a9d3d30ae1baa0f24d
Parents: 3005331
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 12:34:46 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 12:34:46 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java     | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c068ab0d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a4b7ef7..e99c052 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -573,18 +571,25 @@ public class AmazonAsyncEventService implements AsyncEventService {
      * @param indexOperationMessage
      */
     private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+        Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
         //loop through all adds
         indexOperationMessage.getIndexRequests().stream().forEach(req -> {
             UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
-            entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+            if(!apps.containsKey(appId)) {
+                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+                apps.put(appId,true);
+            }
         });
 
         //loop through all deletes
         indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
             UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
-            ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
-            entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+            if(!apps.containsKey(appId)) {
+                ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+                entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+                apps.put(appId,true);
+            }
         });
     }
 


[08/15] usergrid git commit: fix alias issues

Posted by to...@apache.org.
fix alias issues


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/30053318
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/30053318
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/30053318

Branch: refs/heads/master
Commit: 30053318fd848cf777d76af77c90edc474a579ee
Parents: fec6520
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 12:10:07 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 12:10:07 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java             | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/30053318/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 77777c2..a4b7ef7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -551,7 +551,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
 
-        checkInitialize(indexOperationMessage);
+        initializeEntityIndexes(indexOperationMessage);
 
         //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
         //so we'll let compaction on column expiration handle deletion
@@ -567,13 +567,20 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     }
 
-    private void checkInitialize(final IndexOperationMessage indexOperationMessage) {
+    /**
+     *     this method will call initialize for each message, since we are caching the entity indexes,
+     *     we don't worry about aggregating by app id
+     * @param indexOperationMessage
+     */
+    private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+        //loop through all adds
         indexOperationMessage.getIndexRequests().stream().forEach(req -> {
             UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
             ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
             entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
         });
 
+        //loop through all deletes
         indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
             UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
             ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);


[07/15] usergrid git commit: Address fixes found during review

Posted by to...@apache.org.
Address fixes found during review


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f840623d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f840623d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f840623d

Branch: refs/heads/master
Commit: f840623df82131429cfd564e0ad4fdeac356a93d
Parents: 7725d90
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 11:59:53 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:59:53 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  6 +++---
 .../service/ServiceSchedulerFig.java            | 22 ++++++++++++++++++--
 .../usergrid/services/AbstractService.java      |  2 +-
 3 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f840623d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 650bb4d..5d2d8dc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -240,10 +240,10 @@ public class CoreModule extends AbstractModule {
     @Inject
     @ResponseImportTasks
     @Singleton
-    public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+    public RxTaskScheduler getImportRepairScheduler( final ServiceSchedulerFig serviceSchedulerFig ) {
 
-        final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
-        final int threadCount = asyncEventsSchedulerFig.getMaxImportThreads();
+        final String poolName = serviceSchedulerFig.getRepairPoolName();
+        final int threadCount = serviceSchedulerFig.getImportThreadPoolSize();
 
 
         final ThreadPoolExecutor executor = TaskExecutorFactory

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f840623d/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
index ddaa01c..e585ee3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -37,10 +37,28 @@ public interface ServiceSchedulerFig extends GuicyFig {
     String SERVICE_IMPORT_THREADS = "service.import.threads";
 
 
+    String SERVICE_IMPORT_POOL_NAME = "service.import.name";
 
-    @Default("20")
+    String SERVICE_IMPORT_CONCURRENCY = "service.import.concurrency";
+
+
+
+
+    @Default( "Usergrid-Import-Pool" )
+    @Key( SERVICE_IMPORT_POOL_NAME )
+    String getRepairPoolName();
+
+
+
+    @Default("100")
     @Key( SERVICE_IMPORT_THREADS)
-    int getImportThreads();
+    int getImportThreadPoolSize();
+
+
+
+    @Default("20")
+    @Key( SERVICE_IMPORT_CONCURRENCY)
+    int getImportConcurrency();
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f840623d/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 85c973e..3887f92 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -488,7 +488,7 @@ public abstract class AbstractService implements Service {
                     throw new RuntimeException(e);
                 }
             }).subscribeOn(rxScheduler);
-        }, rxSchedulerFig.getImportThreads());
+        }, rxSchedulerFig.getImportConcurrency());
 
         ObservableTimer.time(tuplesObservable, entitiesParallelGetTimer).toBlocking().lastOrDefault(null);
     }


[14/15] usergrid git commit: Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release

Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6f61b05d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6f61b05d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6f61b05d

Branch: refs/heads/master
Commit: 6f61b05dd0531b54637be7e9c407f3058aa98006
Parents: a693643 63f49ba
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 14:21:54 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 14:21:54 2015 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------