You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/11/12 23:14:51 UTC
[01/15] usergrid git commit: Address cleanup not running on write for
unique values. Cleanup now happens on write.
Repository: usergrid
Updated Branches:
refs/heads/master 67ab24430 -> 14dd48d39
Address cleanup not running on write for unique values. Cleanup now happens on write.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/419c0131
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/419c0131
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/419c0131
Branch: refs/heads/master
Commit: 419c0131ff299a25c2d5c2af1186f1f11c998925
Parents: 39ccdb1
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Nov 6 20:30:19 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Nov 6 20:30:19 2015 -0700
----------------------------------------------------------------------
.../impl/EntityCollectionManagerImpl.java | 4 +-
.../mvcc/stage/delete/UniqueCleanup.java | 12 ++++-
.../mvcc/stage/write/WriteCommit.java | 6 +--
.../collection/EntityCollectionManagerIT.java | 55 ++++++++++++++++++++
.../mvcc/stage/delete/MarkCommitTest.java | 2 +-
.../mvcc/stage/write/WriteCommitTest.java | 2 +-
6 files changed, 74 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index ff3bd7b..cb1515c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -188,7 +188,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
- final Observable<Entity> write = observable.map( writeCommit );
+ final Observable<Entity> write = observable.map( writeCommit ).compose( uniqueCleanup )
+ //now extract the ioEvent we need to return
+ .map( ioEvent -> ioEvent.getEvent().getEntity().get() );
return ObservableTimer.time( write, writeTimer );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 3f66536..8aa5cfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -89,6 +89,9 @@ public class UniqueCleanup
final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+ //if it's been deleted, we need to remove everything up to an inclusive of this version.
+ //if it has not, we want to delete everything < this version
+ final boolean isDeleted = !mvccEntityCollectionIoEvent.getEvent().getEntity().isPresent();
//TODO Refactor this logic into a a class that can be invoked from anywhere
@@ -108,7 +111,14 @@ public class UniqueCleanup
logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion );
final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
//TODO: should this be equals? That way we clean up the one marked as well
- return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+
+
+ if(isDeleted){
+ return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+ }
+
+ return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) >= 0;
+
} )
//buffer our buffer size, then roll them all up in a single batch mutation
http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 2afb144..9b1a393 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -56,7 +56,7 @@ import rx.functions.Func1;
* data store before returning
*/
@Singleton
-public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity> {
+public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
private static final Logger LOG = LoggerFactory.getLogger( WriteCommit.class );
@@ -84,7 +84,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
@Override
- public Entity call( final CollectionIoEvent<MvccEntity> ioEvent ) {
+ public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioEvent ) {
final MvccEntity mvccEntity = ioEvent.getEvent();
MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
@@ -134,6 +134,6 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Entity>
}
- return mvccEntity.getEntity().get();
+ return ioEvent;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index 68a04d0..e6c6909 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -315,6 +315,61 @@ public class EntityCollectionManagerIT {
@Test
+ public void writeAndGetField2X() {
+
+
+ ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ Entity newEntity = new Entity( new SimpleId( "test" ) );
+ Field field = new StringField( "testField", "unique", true );
+ newEntity.setField( field );
+
+ EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
+
+ Observable<Entity> observable = manager.write( newEntity );
+
+ Entity createReturned = observable.toBlocking().lastOrDefault( null );
+
+
+ assertNotNull( "Id was assigned", createReturned.getId() );
+ assertNotNull( "Version was assigned", createReturned.getVersion() );
+
+ Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+ assertNotNull( id );
+ assertEquals( newEntity.getId(), id );
+
+ Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
+ id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+ assertNull( id );
+
+
+ //ensure we clean up
+
+ Field fieldSecond = new StringField( "testField", "unique2", true );
+ newEntity.setField( fieldSecond );
+
+ Observable<Entity> observableSecond = manager.write( newEntity );
+
+ Entity createReturnedSecond = observableSecond.toBlocking().lastOrDefault( null );
+
+
+ assertNotNull( "Id was assigned", createReturnedSecond.getId() );
+ assertNotNull( "Version was assigned", createReturnedSecond.getVersion() );
+
+ Id idFirst = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+
+ assertNull(idFirst);
+
+ Id idSecond = manager.getIdField( newEntity.getId().getType(), fieldSecond ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( idSecond );
+ assertEquals( newEntity.getId(), idSecond );
+
+
+ }
+
+
+ @Test
public void updateVersioning() {
// create entity
http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index 1e8fcd1..ad6eac6 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -75,7 +75,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
//verify the observable is correct
- Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) );
+ Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/419c0131/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index fa6077f..58642d3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -87,7 +87,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
- Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) );
+ Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
//verify the log entry is correct
[15/15] usergrid git commit: Merge branch 'refs/heads/2.1-release'
Posted by to...@apache.org.
Merge branch 'refs/heads/2.1-release'
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/14dd48d3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/14dd48d3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/14dd48d3
Branch: refs/heads/master
Commit: 14dd48d39d37abf97092264003942dcb73b3ccdf
Parents: 67ab244 6f61b05
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 15:14:44 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 15:14:44 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 132 ++++++++++++++-----
.../asyncevents/AmazonAsyncEventService.java | 39 +++++-
.../asyncevents/AsyncEventsSchedulerFig.java | 94 +++++++++++++
.../asyncevents/AsyncIndexProvider.java | 2 +-
.../asyncevents/EventExecutionScheduler.java | 37 ++++++
.../traverse/ReadGraphCollectionFilter.java | 3 +-
.../traverse/ReadGraphConnectionFilter.java | 3 +-
.../corepersistence/rx/impl/AsyncRepair.java | 38 ++++++
.../rx/impl/ResponseImportTasks.java | 38 ++++++
.../service/ServiceSchedulerFig.java | 66 ++++++++++
.../collection/guice/CollectionModule.java | 32 +++++
.../guice/CollectionTaskExecutor.java | 35 -----
.../EntityCollectionManagerFactoryImpl.java | 5 +-
.../impl/EntityCollectionManagerImpl.java | 17 ++-
.../mvcc/stage/delete/UniqueCleanup.java | 12 +-
.../mvcc/stage/write/WriteCommit.java | 10 +-
.../scheduler/CollectionExecutorScheduler.java | 52 ++++++++
.../scheduler/CollectionSchedulerFig.java | 53 ++++++++
.../collection/EntityCollectionManagerIT.java | 68 ++++++++++
.../mvcc/stage/delete/MarkCommitTest.java | 2 +-
.../mvcc/stage/write/WriteCommitTest.java | 2 +-
.../core/executor/TaskExecutorFactory.java | 108 +++++++++++----
.../persistence/core/guice/CommonModule.java | 25 ++--
.../persistence/core/rx/RxSchedulerFig.java | 71 ----------
.../core/rx/RxTaskSchedulerImpl.java | 81 +-----------
.../index/impl/DeIndexOperation.java | 4 +-
.../persistence/index/impl/IndexingUtils.java | 21 +++
.../index/impl/IndexingUtilsTest.java | 36 +++++
.../usergrid/services/AbstractService.java | 13 +-
29 files changed, 815 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index ae69b6f,5d2d8dc..aacf6e9
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@@ -16,7 -16,8 +16,9 @@@
package org.apache.usergrid.corepersistence;
+import org.apache.usergrid.persistence.cache.guice.CacheModule;
+ import java.util.concurrent.ThreadPoolExecutor;
+
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 90840af,179b3c4..adaed0f
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@@ -223,44 -221,46 +223,61 @@@ public class IndexingUtils
}
- /**
- * Parse the document id into a candidate result
- */
- public static CandidateResult parseIndexDocId( final String documentId ) {
+ public static CandidateResult parseIndexDocId( final SearchHit hit ) {
+ return parseIndexDocId(hit.getId());
+ }
+
+ public static CandidateResult parseIndexDocId( final SearchHit hit, boolean isGeo ) {
+ final String documentId = hit.getId();
+ final double distance = isGeo ? (double) hit.sortValues()[0] : -1;
+ return parseIndexDocId(documentId,distance);
+ }
+
+ public static CandidateResult parseIndexDocId( final String documentId ) {
+ return parseIndexDocId(documentId,-1);
+ }
+ /**
+ * Parse the document id into a candidate result
+ */
+ public static CandidateResult parseIndexDocId( final String documentId, final double distance ) {
- final Matcher matcher = DOCUMENT_PATTERN.matcher( documentId );
+ final Matcher matcher = DOCUMENT_PATTERN.matcher(documentId);
- Preconditions.checkArgument( matcher.matches(), "Pattern for document id did not match expected format" );
- Preconditions.checkArgument( matcher.groupCount() == 9, "9 groups expected in the pattern" );
+ Preconditions.checkArgument(matcher.matches(), "Pattern for document id did not match expected format");
+ Preconditions.checkArgument(matcher.groupCount() == 9, "9 groups expected in the pattern");
//Other fields can be parsed using groups. The groups start at value 1, group 0 is the entire match
- final String entityUUID = matcher.group( 3 );
- final String entityType = matcher.group( 4 );
+ final String entityUUID = matcher.group(3);
+ final String entityType = matcher.group(4);
- final String versionUUID = matcher.group( 5 );
+ final String versionUUID = matcher.group(5);
- Id entityId = new SimpleId( UUID.fromString( entityUUID ), entityType );
+ Id entityId = new SimpleId(UUID.fromString(entityUUID), entityType);
- return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId );
+ return distance >= 0
+ ? new GeoCandidateResult(entityId, UUID.fromString(versionUUID), documentId, distance)
+ : new CandidateResult(entityId, UUID.fromString(versionUUID), documentId);
}
+ /**
+ * Parse the document id into a candidate result
+ */
+ public static UUID parseAppIdFromIndexDocId( final String documentId) {
+
+ final Matcher matcher = DOCUMENT_PATTERN.matcher(documentId);
+
+ Preconditions.checkArgument(matcher.matches(), "Pattern for document id did not match expected format");
+ Preconditions.checkArgument(matcher.groupCount() == 9, "9 groups expected in the pattern");
+
+ //Other fields can be parsed using groups. The groups start at value 1, group 0 is the entire match
+ final String appUUID = matcher.group(1);
+
+ return UUID.fromString(appUUID);
+
+ }
+
/**
* Get the entity type
http://git-wip-us.apache.org/repos/asf/usergrid/blob/14dd48d3/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --cc stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index e7d4fc4,3887f92..88d87fe
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@@ -27,7 -27,9 +27,10 @@@ import java.util.Set
import java.util.UUID;
import com.codahale.metrics.Timer;
+import org.apache.usergrid.persistence.cache.CacheFactory;
+
+ import org.apache.usergrid.corepersistence.rx.impl.ResponseImportTasks;
+ import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.slf4j.Logger;
[13/15] usergrid git commit: Merge commit 'refs/pull/433/head' of
github.com:apache/usergrid into 2.1-release
Posted by to...@apache.org.
Merge commit 'refs/pull/433/head' of github.com:apache/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/63f49bac
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/63f49bac
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/63f49bac
Branch: refs/heads/master
Commit: 63f49bac58395b8fc9b36fedd59dae297e63459f
Parents: b895643 f840623
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Nov 12 13:19:54 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Nov 12 13:19:54 2015 -0800
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 134 ++++++++++++++-----
.../asyncevents/AmazonAsyncEventService.java | 1 +
.../asyncevents/AsyncEventsSchedulerFig.java | 94 +++++++++++++
.../asyncevents/AsyncIndexProvider.java | 2 +-
.../asyncevents/EventExecutionScheduler.java | 37 +++++
.../traverse/ReadGraphCollectionFilter.java | 3 +-
.../traverse/ReadGraphConnectionFilter.java | 3 +-
.../corepersistence/rx/impl/AsyncRepair.java | 38 ++++++
.../rx/impl/ResponseImportTasks.java | 38 ++++++
.../service/ServiceSchedulerFig.java | 66 +++++++++
.../collection/guice/CollectionModule.java | 32 +++++
.../guice/CollectionTaskExecutor.java | 35 -----
.../EntityCollectionManagerFactoryImpl.java | 5 +-
.../impl/EntityCollectionManagerImpl.java | 17 ++-
.../mvcc/stage/delete/UniqueCleanup.java | 12 +-
.../mvcc/stage/write/WriteCommit.java | 10 +-
.../scheduler/CollectionExecutorScheduler.java | 52 +++++++
.../scheduler/CollectionSchedulerFig.java | 53 ++++++++
.../collection/EntityCollectionManagerIT.java | 68 ++++++++++
.../mvcc/stage/delete/MarkCommitTest.java | 2 +-
.../mvcc/stage/write/WriteCommitTest.java | 2 +-
.../core/executor/TaskExecutorFactory.java | 108 +++++++++++----
.../persistence/core/guice/CommonModule.java | 25 ++--
.../persistence/core/rx/RxSchedulerFig.java | 71 ----------
.../core/rx/RxTaskSchedulerImpl.java | 81 +----------
.../usergrid/services/AbstractService.java | 13 +-
26 files changed, 724 insertions(+), 278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/63f49bac/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
[05/15] usergrid git commit: fix alias issues
Posted by to...@apache.org.
fix alias issues
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fec6520e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fec6520e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fec6520e
Branch: refs/heads/master
Commit: fec6520eb47fa6fe7bc1d8d7fad6a729874f3475
Parents: 204bf04
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 11:07:29 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 11:47:08 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 15 ++++++++
.../index/impl/DeIndexOperation.java | 4 +--
.../persistence/index/impl/IndexingUtils.java | 21 ++++++++++++
.../index/impl/IndexingUtilsTest.java | 36 ++++++++++++++++++++
4 files changed, 74 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 16e119c..77777c2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -550,6 +551,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
+ checkInitialize(indexOperationMessage);
//NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message
//so we'll let compaction on column expiration handle deletion
@@ -565,6 +567,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
+ private void checkInitialize(final IndexOperationMessage indexOperationMessage) {
+ indexOperationMessage.getIndexRequests().stream().forEach(req -> {
+ UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ });
+
+ indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
+ UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ });
+ }
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index dbecf8a..aefceda 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -44,10 +44,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd
public class DeIndexOperation implements BatchOperation {
@JsonProperty
- private String[] indexes;
+ public String[] indexes;
@JsonProperty
- private String documentId;
+ public String documentId;
public DeIndexOperation() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 18cb928..179b3c4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -244,6 +244,23 @@ public class IndexingUtils {
return new CandidateResult( entityId, UUID.fromString( versionUUID ), documentId );
}
+ /**
+ * Parse the document id into a candidate result
+ */
+ public static UUID parseAppIdFromIndexDocId( final String documentId) {
+
+ final Matcher matcher = DOCUMENT_PATTERN.matcher(documentId);
+
+ Preconditions.checkArgument(matcher.matches(), "Pattern for document id did not match expected format");
+ Preconditions.checkArgument(matcher.groupCount() == 9, "9 groups expected in the pattern");
+
+ //Other fields can be parsed using groups. The groups start at value 1, group 0 is the entire match
+ final String appUUID = matcher.group(1);
+
+ return UUID.fromString(appUUID);
+
+ }
+
/**
* Get the entity type
@@ -262,4 +279,8 @@ public class IndexingUtils {
sb.append( ENTITY_TYPE_NAME).append("(" ).append( type ).append( ")" );
return sb.toString();
}
+
+ public static UUID getApplicationIdFromIndexDocId(String documentId) {
+ return parseAppIdFromIndexDocId(documentId);
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/fec6520e/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
index 97389b2..d93f8a3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexingUtilsTest.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.parseAppIdFromIndexDocId;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.parseIndexDocId;
import static org.junit.Assert.assertEquals;
@@ -91,6 +92,41 @@ public class IndexingUtilsTest {
@Test
+ public void testAppIdFromDocumentId() {
+
+ final ApplicationScopeImpl applicationScope = new ApplicationScopeImpl( new SimpleId( "application" ) );
+
+ final Id id = new SimpleId( "id" );
+ final UUID version = UUIDGenerator.newTimeUUID();
+
+ final SearchEdgeImpl searchEdge =
+ new SearchEdgeImpl( new SimpleId( "source" ), "users", SearchEdge.NodeType.TARGET );
+
+ final String output = IndexingUtils.createIndexDocId( applicationScope, id, version, searchEdge );
+
+
+ final String expected =
+ "appId(" + applicationScope.getApplication().getUuid() + ",application).entityId(" + id.getUuid() + "," + id
+ .getType() + ").version(" + version + ").nodeId(" + searchEdge.getNodeId().getUuid() + "," + searchEdge
+ .getNodeId().getType() + ").edgeName(users).nodeType(TARGET)";
+
+
+ assertEquals( output, expected );
+
+
+ //now parse it
+
+ final CandidateResult parsedId = parseIndexDocId( output );
+
+ assertEquals(version, parsedId.getVersion());
+ assertEquals(id, parsedId.getId());
+
+ final UUID appId = parseAppIdFromIndexDocId(output);
+ assertEquals(appId,applicationScope.getApplication().getUuid());
+ }
+
+
+ @Test
public void testDocumentIdPipes() {
final ApplicationScopeImpl applicationScope = new ApplicationScopeImpl( new SimpleId( "application" ) );
[04/15] usergrid git commit: Refactored schedulers to have separate
schedulers for different tasks
Posted by to...@apache.org.
Refactored schedulers to have separate schedulers for different tasks
Also fixes a bug with unique values. Values are now validated on read to ensure that unique value is still valid.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0e1f0e64
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0e1f0e64
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0e1f0e64
Branch: refs/heads/master
Commit: 0e1f0e64176ce13b579762c42d8358eaa7543f20
Parents: 7da99c7
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 11 16:55:00 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:27:16 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 130 ++++++++++++++-----
.../asyncevents/AmazonAsyncEventService.java | 1 +
.../asyncevents/AsyncEventsSchedulerFig.java | 94 ++++++++++++++
.../asyncevents/AsyncIndexProvider.java | 2 +-
.../asyncevents/EventExecutionScheduler.java | 37 ++++++
.../traverse/ReadGraphCollectionFilter.java | 3 +-
.../traverse/ReadGraphConnectionFilter.java | 3 +-
.../corepersistence/rx/impl/AsyncRepair.java | 38 ++++++
.../corepersistence/rx/impl/ImportRepair.java | 38 ++++++
.../service/ServiceSchedulerFig.java | 48 +++++++
.../collection/guice/CollectionModule.java | 30 +++++
.../guice/CollectionTaskExecutor.java | 35 -----
.../EntityCollectionManagerFactoryImpl.java | 5 +-
.../impl/EntityCollectionManagerImpl.java | 16 ++-
.../mvcc/stage/write/WriteCommit.java | 4 +-
.../scheduler/CollectionExecutorScheduler.java | 52 ++++++++
.../scheduler/CollectionSchedulerFig.java | 53 ++++++++
.../collection/EntityCollectionManagerIT.java | 53 +++++---
.../core/executor/TaskExecutorFactory.java | 101 ++++++++++----
.../persistence/core/guice/CommonModule.java | 25 ++--
.../persistence/core/rx/RxSchedulerFig.java | 71 ----------
.../core/rx/RxTaskSchedulerImpl.java | 81 +-----------
.../usergrid/services/AbstractService.java | 12 +-
23 files changed, 639 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 959edec..09db151 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,12 +16,16 @@
package org.apache.usergrid.corepersistence;
+import java.util.concurrent.ThreadPoolExecutor;
+
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventsSchedulerFig;
import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventExecutionScheduler;
import org.apache.usergrid.corepersistence.index.ApplicationIndexBucketLocator;
import org.apache.usergrid.corepersistence.index.CoreIndexFig;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -42,6 +46,8 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
import org.apache.usergrid.corepersistence.service.AggregationService;
import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
import org.apache.usergrid.corepersistence.service.AggregationServiceImpl;
@@ -51,20 +57,26 @@ import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.CollectionServiceImpl;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
import org.apache.usergrid.corepersistence.service.StatusService;
import org.apache.usergrid.corepersistence.service.StatusServiceImpl;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
import org.apache.usergrid.persistence.index.guice.IndexModule;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
@@ -73,25 +85,22 @@ import com.google.inject.multibindings.Multibinder;
/**
* Guice Module that encapsulates Core Persistence.
*/
-public class CoreModule extends AbstractModule {
-
-
+public class CoreModule extends AbstractModule {
@Override
protected void configure() {
- install( new CommonModule());
+ install( new CommonModule() );
install( new CollectionModule() {
/**
* configure our migration data provider for all entities in the system
*/
@Override
- public void configureMigrationProvider() {
+ public void configureMigrationProvider() {
- bind(new TypeLiteral<MigrationDataProvider<EntityIdScope>>(){}).to(
- AllEntitiesInSystemImpl.class );
- }
+ bind( new TypeLiteral<MigrationDataProvider<EntityIdScope>>() {} ).to( AllEntitiesInSystemImpl.class );
+ }
} );
install( new GraphModule() {
@@ -100,30 +109,28 @@ public class CoreModule extends AbstractModule {
*/
@Override
public void configureMigrationProvider() {
- bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
- AllNodesInGraphImpl.class );
+ bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to( AllNodesInGraphImpl.class );
}
} );
- install(new IndexModule(){
+ install( new IndexModule() {
@Override
public void configureMigrationProvider() {
- bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ).to(
- AllApplicationsObservableImpl.class );
+ bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} )
+ .to( AllApplicationsObservableImpl.class );
}
- });
- // install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
- // install(new QueueModule());
+ } );
+ // install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
+ // install(new QueueModule());
- bind(ManagerCache.class).to( CpManagerCache.class );
- bind(ApplicationIdCacheFactory.class);
+ bind( ManagerCache.class ).to( CpManagerCache.class );
+ bind( ApplicationIdCacheFactory.class );
/**
* Create our migrations for within our core plugin
*/
Multibinder<DataMigration> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(),
- new TypeLiteral<DataMigration>() {}, CoreMigration.class );
+ Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration>() {}, CoreMigration.class );
dataMigrationMultibinder.addBinding().to( DeDupConnectionDataMigration.class );
@@ -135,7 +142,7 @@ public class CoreModule extends AbstractModule {
plugins.addBinding().to( MigrationModuleVersionPlugin.class );
bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class );
- bind( AllEntityIdsObservable.class).to( AllEntityIdsObservableImpl.class );
+ bind( AllEntityIdsObservable.class ).to( AllEntityIdsObservableImpl.class );
/*****
@@ -143,50 +150,103 @@ public class CoreModule extends AbstractModule {
*****/
- bind( IndexService.class ).to(IndexServiceImpl.class);
+ bind( IndexService.class ).to( IndexServiceImpl.class );
//bind the event handlers
- bind( EventBuilder.class).to( EventBuilderImpl.class );
- bind(ApplicationIndexBucketLocator.class);
+ bind( EventBuilder.class ).to( EventBuilderImpl.class );
+ bind( ApplicationIndexBucketLocator.class );
//bind the queue provider
bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
- bind( ReIndexService.class).to(ReIndexServiceImpl.class);
+ bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
- install(new FactoryModuleBuilder()
- .implement(AggregationService.class, AggregationServiceImpl.class)
- .build(AggregationServiceFactory.class));
+ install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
+ .build( AggregationServiceFactory.class ) );
- bind(IndexLocationStrategyFactory.class).to( IndexLocationStrategyFactoryImpl.class );
+ bind( IndexLocationStrategyFactory.class ).to( IndexLocationStrategyFactoryImpl.class );
- install(new GuicyFigModule(IndexProcessorFig.class));
-
- install(new GuicyFigModule(CoreIndexFig.class));
+ install( new GuicyFigModule( IndexProcessorFig.class ) );
+ install( new GuicyFigModule( CoreIndexFig.class ) );
install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
install( new GuicyFigModule( EntityManagerFig.class ) );
+ install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
+
+ install( new GuicyFigModule( ServiceSchedulerFig.class ) );
+
//install our pipeline modules
- install(new PipelineModule());
+ install( new PipelineModule() );
/**
* Install our service operations
*/
- bind( CollectionService.class).to( CollectionServiceImpl.class );
+ bind( CollectionService.class ).to( CollectionServiceImpl.class );
- bind( ConnectionService.class).to( ConnectionServiceImpl.class);
+ bind( ConnectionService.class ).to( ConnectionServiceImpl.class );
bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
bind( StatusService.class ).to( StatusServiceImpl.class );
+ }
+
+
+ @Provides
+ @Inject
+ @EventExecutionScheduler
+ public RxTaskScheduler getSqsTaskScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+ final String poolName = asyncEventsSchedulerFig.getIoSchedulerName();
+ final int threadCount = asyncEventsSchedulerFig.getMaxIoThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory
+ .createTaskExecutor( poolName, threadCount, threadCount, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+ return taskScheduler;
}
+
+ @Provides
+ @Inject
+ @AsyncRepair
+ public RxTaskScheduler getAsyncRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+ final String poolName = asyncEventsSchedulerFig.getRepairPoolName();
+ final int threadCount = asyncEventsSchedulerFig.getMaxRepairThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory
+ .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.DROP );
+
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+ return taskScheduler;
+ }
+
+
+ @Provides
+ @Inject
+ @ImportRepair
+ public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+ final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
+ final int threadCount = asyncEventsSchedulerFig.getMaxImportThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory
+ .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+ return taskScheduler;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 16e119c..24ec51f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -153,6 +153,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final EventBuilder eventBuilder,
final MapManagerFactory mapManagerFactory,
final QueueFig queueFig,
+ @EventExecutionScheduler
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
new file mode 100644
index 0000000..83eb02e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface AsyncEventsSchedulerFig extends GuicyFig {
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String REPAIR_SCHEDULER_THREADS = "repair.io.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String REPAIR_SCHEDULER_NAME = "repair.io.poolName";
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String IMPORT_SCHEDULER_THREADS = "import.io.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String IMPORT_SCHEDULER_NAME = "import.io.poolName";
+
+
+ @Default( "100" )
+ @Key( IO_SCHEDULER_THREADS )
+ int getMaxIoThreads();
+
+ @Default( "Usergrid-SQS-Pool" )
+ @Key( IO_SCHEDULER_NAME )
+ String getIoSchedulerName();
+
+
+ @Default( "20" )
+ @Key( REPAIR_SCHEDULER_THREADS )
+ int getMaxRepairThreads();
+
+ @Default( "Usergrid-Repair-Pool" )
+ @Key( REPAIR_SCHEDULER_NAME )
+ String getRepairPoolName();
+
+ @Default( "100" )
+ @Key( IMPORT_SCHEDULER_THREADS )
+ int getMaxImportThreads();
+
+ @Default( "Usergrid-Import-Pool" )
+ @Key( IMPORT_SCHEDULER_NAME )
+ String getImportSchedulerName();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2bace8d..d65cffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -62,7 +62,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
final QueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory,
- final RxTaskScheduler rxTaskScheduler,
+ @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EventBuilder eventBuilder,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
new file mode 100644
index 0000000..ce09aae
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the event execution scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface EventExecutionScheduler {}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 3d7df3b..3819659 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -44,7 +45,7 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
*/
@Inject
public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
- final RxTaskScheduler rxTaskScheduler,
+ @AsyncRepair final RxTaskScheduler rxTaskScheduler,
final EventBuilder eventBuilder,
final AsyncEventService asyncEventService,
@Assisted final String collectionName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index b2d368b..3c92c03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -44,7 +45,7 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
*/
@Inject
public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
- final RxTaskScheduler rxTaskScheduler,
+ @AsyncRepair final RxTaskScheduler rxTaskScheduler,
final EventBuilder eventBuilder,
final AsyncEventService asyncEventService,
@Assisted final String connectionName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
new file mode 100644
index 0000000..aa2cc12
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface AsyncRepair {
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
new file mode 100644
index 0000000..d65d04c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ImportRepair {
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
new file mode 100644
index 0000000..ddaa01c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface ServiceSchedulerFig extends GuicyFig {
+
+
+ /**
+ * The number of threads to use when importing entities into result sets
+ */
+ String SERVICE_IMPORT_THREADS = "service.import.threads";
+
+
+
+ @Default("20")
+ @Key( SERVICE_IMPORT_THREADS)
+ int getImportThreads();
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 78c7f37..0a6e270 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.collection.guice;
+import java.util.concurrent.ThreadPoolExecutor;
+
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -25,11 +27,18 @@ import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionSchedulerFig;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
/**
@@ -45,6 +54,7 @@ public abstract class CollectionModule extends AbstractModule {
// noinspection unchecked
install( new GuicyFigModule( SerializationFig.class ) );
+ install( new GuicyFigModule( CollectionSchedulerFig.class ) );
install( new SerializationModule() );
install( new ServiceModule() );
@@ -62,6 +72,26 @@ public abstract class CollectionModule extends AbstractModule {
}
+
+
+ @Provides
+ @Inject
+ @CollectionExecutorScheduler
+ public RxTaskScheduler getRxTaskScheduler( final CollectionSchedulerFig collectionSchedulerFig ){
+
+ final String poolName = collectionSchedulerFig.getIoSchedulerName();
+ final int threadCount = collectionSchedulerFig.getMaxIoThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, threadCount,
+ TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl(executor );
+
+ return taskScheduler;
+ }
+
+
/**
* Gives callers the ability to to configure an instance of
*
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
deleted file mode 100644
index 53c1f48..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface CollectionTaskExecutor {}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 45cee06..a52ee9c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
@@ -74,7 +75,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace;
- private final EntityCacheFig entityCacheFig;
private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler;
@@ -107,7 +107,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final Keyspace keyspace, final EntityCacheFig entityCacheFig,
- MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
+ final MetricsFactory metricsFactory, @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler ) {
this.writeStart = writeStart;
this.writeVerifyUnique = writeVerifyUnique;
@@ -123,7 +123,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
this.keyspace = keyspace;
- this.entityCacheFig = entityCacheFig;
this.metricsFactory = metricsFactory;
this.rxTaskScheduler = rxTaskScheduler;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index cb1515c..d6bbdc5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.codahale.metrics.Timer;
@@ -117,7 +118,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final Timer deleteTimer;
private final Timer fieldIdTimer;
private final Timer fieldEntityTimer;
- private final Timer updateTimer;
private final Timer loadTimer;
private final Timer getLatestTimer;
@@ -165,7 +165,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.delete");
this.fieldIdTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldId");
this.fieldEntityTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldEntity");
- this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.update");
this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.load");
this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.latest");
}
@@ -188,8 +187,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
- final Observable<Entity> write = observable.map( writeCommit ).compose( uniqueCleanup )
- //now extract the ioEvent we need to return
+ final Observable<Entity> write = observable.map( writeCommit )
+ .map(ioEvent -> {
+ //fire this in the background so we don't block writes
+ Observable.just( ioEvent ).compose( uniqueCleanup ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+ return ioEvent;
+ }
+ )
+ //now extract the ioEvent we need to return and update the version
.map( ioEvent -> ioEvent.getEvent().getEntity().get() );
return ObservableTimer.time( write, writeTimer );
@@ -358,7 +363,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
continue;
}
-
//else add it to our result set
response.addEntity( expectedUnique.getField(), entity );
}
@@ -380,6 +384,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
+
+
// fire the stages
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 9b1a393..fe3f9a9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -94,7 +94,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
final ApplicationScope applicationScope = ioEvent.getEntityCollection();
//set the version into the entity
- EntityUtils.setVersion( mvccEntity.getEntity().get(), version );
+ final Entity entity = mvccEntity.getEntity().get();
+
+ EntityUtils.setVersion( entity, version );
MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() );
ValidationUtils.verifyTimeUuid( version ,"version" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
new file mode 100644
index 0000000..8f8aa00
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CollectionExecutorScheduler {}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
new file mode 100644
index 0000000..daefa9b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface CollectionSchedulerFig extends GuicyFig {
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String COLLECTION_SCHEDULER_THREADS = "scheduler.collection.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String COLLECTION_SCHEDULER_NAME = "scheduler.collection.poolName";
+
+
+ @Default( "20" )
+ @Key( COLLECTION_SCHEDULER_THREADS )
+ int getMaxIoThreads();
+
+ @Default( "Usergrid-Collection-Pool" )
+ @Key( COLLECTION_SCHEDULER_NAME )
+ String getIoSchedulerName();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index e6c6909..115be99 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -58,6 +59,7 @@ import rx.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -315,18 +317,17 @@ public class EntityCollectionManagerIT {
@Test
- public void writeAndGetField2X() {
-
-
+ public void writeAndGetField2X() throws InterruptedException {
ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
- Entity newEntity = new Entity( new SimpleId( "test" ) );
- Field field = new StringField( "testField", "unique", true );
- newEntity.setField( field );
+ final Id entityId = new SimpleId( "test" );
+ Entity firstInstance = new Entity( entityId );
+ Field firstField = new StringField( "testField", "unique", true );
+ firstInstance.setField( firstField );
EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
- Observable<Entity> observable = manager.write( newEntity );
+ Observable<Entity> observable = manager.write( firstInstance );
Entity createReturned = observable.toBlocking().lastOrDefault( null );
@@ -334,21 +335,22 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturned.getId() );
assertNotNull( "Version was assigned", createReturned.getVersion() );
- Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
- assertNotNull( id );
- assertEquals( newEntity.getId(), id );
+ final Id existingId = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+ assertNotNull( existingId );
+ assertEquals( firstInstance.getId(), existingId );
Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
- id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
- assertNull( id );
+ final Id noId = manager.getIdField( firstInstance.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+ assertNull( noId );
//ensure we clean up
- Field fieldSecond = new StringField( "testField", "unique2", true );
- newEntity.setField( fieldSecond );
+ Entity secondInstance = new Entity( entityId );
+ Field secondField = new StringField( firstField.getName(), "unique2", true );
+ secondInstance.setField( secondField );
- Observable<Entity> observableSecond = manager.write( newEntity );
+ Observable<Entity> observableSecond = manager.write( secondInstance );
Entity createReturnedSecond = observableSecond.toBlocking().lastOrDefault( null );
@@ -356,16 +358,27 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturnedSecond.getId() );
assertNotNull( "Version was assigned", createReturnedSecond.getVersion() );
- Id idFirst = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+ assertNotEquals( "Versions should not be equal", createReturned.getVersion(), createReturnedSecond.getVersion() );
- assertNull(idFirst);
+ //sanity check, get the entity to ensure it's the right version
- Id idSecond = manager.getIdField( newEntity.getId().getType(), fieldSecond ).toBlocking().lastOrDefault( null );
+ final Entity loadedVersion = manager.load( entityId ).toBlocking().last();
- assertNotNull( idSecond );
- assertEquals( newEntity.getId(), idSecond );
+ assertEquals(entityId, loadedVersion.getId());
+ assertEquals(createReturnedSecond.getVersion(), loadedVersion.getVersion());
+ //give clean time to run. need to finish the todo below
+ Thread.sleep( 2000 );
+ //TODO, we need to implement verify and repair on this
+ final Id idFirst = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+ assertNull(idFirst);
+
+
+ final Id idSecond = manager.getIdField( secondInstance.getId().getType(), secondField ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( idSecond );
+ assertEquals( secondInstance.getId(), idSecond );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index 3c6a750..bd3d3e9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,30 +20,45 @@
package org.apache.usergrid.persistence.core.executor;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* A task executor that allows you to submit tasks
*/
public class TaskExecutorFactory {
- private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
+ private static final Logger log = LoggerFactory.getLogger( TaskExecutorFactory.class );
+
public enum RejectionAction {
+ /**
+ * If there is no capacity left, throw an exception
+ */
ABORT,
- CALLERRUNS
+ /**
+ * If there is no capacity left, the caller runs the callable
+ */
+ CALLERRUNS,
+
+ /**
+ * If there is no capacity left, the request is logged and then silently dropped
+ */
+ DROP
}
+
+
/**
* Create a task executor
- * @param schedulerName
- * @param maxThreadCount
- * @param maxQueueSize
- * @return
*/
public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
final int maxQueueSize, RejectionAction rejectionAction ) {
@@ -52,22 +67,22 @@ public class TaskExecutorFactory {
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
- if(rejectionAction.equals(RejectionAction.ABORT)){
-
+ if ( rejectionAction == RejectionAction.ABORT ) {
return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
-
}
- else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){
+ else if ( rejectionAction == RejectionAction.CALLERRUNS ) {
return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );
-
- }else{
- //default to the thread pool with ABORT policy
- return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
}
-
+ else if ( rejectionAction == RejectionAction.DROP ) {
+ return new MaxSizeThreadPoolDrops( queue, schedulerName, maxThreadCount );
+ }
+ else {
+ throw new IllegalArgumentException( "Unable to create a scheduler with the arguments provided" );
+ }
}
+
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
@@ -78,14 +93,29 @@ public class TaskExecutorFactory {
}
}
+
/**
* Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
*/
private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {
- public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
- super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
- new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
+ public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName,
+ final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+ new CallerRunsHandler( poolName ) );
+ }
+ }
+
+
+ /**
+ * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPoolDrops extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPoolDrops( final BlockingQueue<Runnable> queue, final String poolName,
+ final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+ new DropHandler( poolName ) );
}
}
@@ -111,29 +141,50 @@ public class TaskExecutorFactory {
Thread t = new Thread( r, threadName );
//set it to be a daemon thread so it doesn't block shutdown
- t.setDaemon(true);
+ t.setDaemon( true );
return t;
}
}
+
/**
* The handler that will handle rejected executions and signal the interface
*/
- private static final class RejectedHandler implements RejectedExecutionHandler {
+ private static final class CallerRunsHandler implements RejectedExecutionHandler {
private final String poolName;
- private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+ private CallerRunsHandler( final String poolName ) {this.poolName = poolName;}
+
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+ log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r,
+ Thread.currentThread().getName() );
//We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
r.run();
}
+ }
+
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private static final class DropHandler implements RejectedExecutionHandler {
+
+ private final String poolName;
+
+ private DropHandler( final String poolName ) {this.poolName = poolName;}
+
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ log.warn( "{} task queue full, dropping task {}", poolName, r );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index b93ba76..75e2b29 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,11 +19,6 @@
package org.apache.usergrid.persistence.core.guice;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFig;
-import org.apache.usergrid.persistence.core.migration.data.*;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -32,14 +27,21 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFig;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCacheImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import com.google.inject.AbstractModule;
import com.google.inject.Key;
@@ -91,16 +93,11 @@ public class CommonModule extends AbstractModule {
Multibinder.newSetBinder(binder(), MigrationPlugin.class);
- /**
- * RX java scheduler configuration
- */
-
- install(new GuicyFigModule(RxSchedulerFig.class));
install(new GuicyFigModule(ClusterFig.class));
bind(SettingsValidationCluster.class).asEagerSingleton(); //validate props from ClusterFig on startup
- bind(RxTaskScheduler.class).to(RxTaskSchedulerImpl.class);
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
deleted file mode 100644
index 4511518..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- *
- */
-@FigSingleton
-public interface RxSchedulerFig extends GuicyFig {
-
-
- /**
- * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple
- * backpressure
- */
- String IO_SCHEDULER_THREADS = "scheduler.io.threads";
-
-
- /**
- * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple
- * backpressure
- */
- String IO_SCHEDULER_NAME = "scheduler.io.poolName";
-
- /**
- * The number of threads to use when importing entities into result sets
- */
- String IO_IMPORT_THREADS = "scheduler.import.threads";
-
-
-
-
- @Default( "100" )
- @Key( IO_SCHEDULER_THREADS )
- int getMaxIoThreads();
-
- @Default( "Usergrid-RxIOPool" )
- @Key(IO_SCHEDULER_NAME)
- String getIoSchedulerName();
-
- @Default("20")
- @Key( IO_IMPORT_THREADS)
- int getImportThreads();
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
index dce46cb..261cbeb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -20,18 +20,9 @@
package org.apache.usergrid.persistence.core.rx;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -42,29 +33,17 @@ import rx.schedulers.Schedulers;
/**
* An implementation of the task scheduler that allows us to control the number of I/O threads
*/
-@Singleton
public class RxTaskSchedulerImpl implements RxTaskScheduler {
- private static final Logger log = LoggerFactory.getLogger( RxTaskSchedulerImpl.class );
-
private final Scheduler scheduler;
- private final String poolName;
@Inject
- public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){
-
- this.poolName = schedulerFig.getIoSchedulerName();
-
- final int poolSize = schedulerFig.getMaxIoThreads();
-
-
- final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
-
+ public RxTaskSchedulerImpl(final ThreadPoolExecutor executor){
- final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
+ Preconditions.checkNotNull( executor , "executor must not be null");
- this.scheduler = Schedulers.from(threadPool);
+ this.scheduler = Schedulers.from(executor);
}
@@ -76,56 +55,4 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler {
}
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize ) {
-
- super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), new RejectedHandler() );
- }
- }
-
-
- /**
- * Thread factory that will name and count threads for easier debugging
- */
- private final class CountingThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- final String threadName = poolName + "-" + newValue;
-
- Thread t = new Thread( r, threadName );
-
- //set it to be a daemon thread so it doesn't block shutdown
- t.setDaemon( true );
-
- return t;
- }
- }
-
-
- /**
- * The handler that will handle rejected executions and signal the interface
- */
- private final class RejectedHandler implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
-
- //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
-
- r.run();
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index d032589..662370f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -27,6 +27,10 @@ import java.util.Set;
import java.util.UUID;
import com.codahale.metrics.Timer;
+
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.slf4j.Logger;
@@ -42,7 +46,6 @@ import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.security.shiro.utils.SubjectUtils;
import org.apache.usergrid.services.ServiceParameter.IdParameter;
@@ -54,6 +57,7 @@ import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
import com.google.inject.Injector;
+import com.google.inject.Key;
import rx.Observable;
import rx.Scheduler;
@@ -100,7 +104,7 @@ public abstract class AbstractService implements Service {
protected Map<String, Object> defaultEntityMetadata;
private Scheduler rxScheduler;
- private RxSchedulerFig rxSchedulerFig;
+ private ServiceSchedulerFig rxSchedulerFig;
private MetricsFactory metricsFactory;
private Timer entityGetTimer;
private Timer entitiesGetTimer;
@@ -117,8 +121,8 @@ public abstract class AbstractService implements Service {
this.sm = sm;
em = sm.getEntityManager();
final Injector injector = sm.getApplicationContext().getBean( Injector.class );
- rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
- rxSchedulerFig = injector.getInstance(RxSchedulerFig.class);
+ rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ImportRepair.class)).getAsyncIOScheduler();
+ rxSchedulerFig = injector.getInstance(ServiceSchedulerFig.class );
metricsFactory = injector.getInstance(MetricsFactory.class);
this.entityGetTimer = metricsFactory.getTimer(this.getClass(), "importEntity.get");
this.entitiesGetTimer = metricsFactory.getTimer(this.getClass(), "importEntities.get");
[06/15] usergrid git commit: Addresses issues found in the review.
Posted by to...@apache.org.
Addresses issues found in the review.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7725d90e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7725d90e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7725d90e
Branch: refs/heads/master
Commit: 7725d90ec504f2002712636cfcad6a395de21226
Parents: 0e1f0e6
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 11:54:12 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:54:12 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 12 ++++---
.../corepersistence/rx/impl/ImportRepair.java | 38 --------------------
.../rx/impl/ResponseImportTasks.java | 38 ++++++++++++++++++++
.../collection/guice/CollectionModule.java | 4 ++-
.../impl/EntityCollectionManagerImpl.java | 3 ++
.../core/executor/TaskExecutorFactory.java | 9 ++++-
.../usergrid/services/AbstractService.java | 5 ++-
7 files changed, 62 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 09db151..650bb4d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -47,7 +47,7 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
-import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ResponseImportTasks;
import org.apache.usergrid.corepersistence.service.AggregationService;
import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
import org.apache.usergrid.corepersistence.service.AggregationServiceImpl;
@@ -77,6 +77,7 @@ import org.apache.usergrid.persistence.index.guice.IndexModule;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
@@ -200,6 +201,7 @@ public class CoreModule extends AbstractModule {
@Provides
@Inject
@EventExecutionScheduler
+ @Singleton
public RxTaskScheduler getSqsTaskScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
final String poolName = asyncEventsSchedulerFig.getIoSchedulerName();
@@ -218,6 +220,7 @@ public class CoreModule extends AbstractModule {
@Provides
@Inject
@AsyncRepair
+ @Singleton
public RxTaskScheduler getAsyncRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
final String poolName = asyncEventsSchedulerFig.getRepairPoolName();
@@ -225,7 +228,7 @@ public class CoreModule extends AbstractModule {
final ThreadPoolExecutor executor = TaskExecutorFactory
- .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.DROP );
+ .createTaskExecutor( poolName, threadCount, 0, TaskExecutorFactory.RejectionAction.DROP );
final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
@@ -235,7 +238,8 @@ public class CoreModule extends AbstractModule {
@Provides
@Inject
- @ImportRepair
+ @ResponseImportTasks
+ @Singleton
public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
@@ -243,7 +247,7 @@ public class CoreModule extends AbstractModule {
final ThreadPoolExecutor executor = TaskExecutorFactory
- .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+ .createTaskExecutor( poolName, threadCount, 0, TaskExecutorFactory.RejectionAction.CALLERRUNS );
final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
deleted file mode 100644
index d65d04c..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx.impl;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-/**
- * Label for using the async repair scheduler
- */
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface ImportRepair {
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java
new file mode 100644
index 0000000..bf74d1f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ResponseImportTasks.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ResponseImportTasks {
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 0a6e270..d788174 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Provides;
+import com.google.inject.Singleton;
/**
@@ -77,13 +78,14 @@ public abstract class CollectionModule extends AbstractModule {
@Provides
@Inject
@CollectionExecutorScheduler
+ @Singleton
public RxTaskScheduler getRxTaskScheduler( final CollectionSchedulerFig collectionSchedulerFig ){
final String poolName = collectionSchedulerFig.getIoSchedulerName();
final int threadCount = collectionSchedulerFig.getMaxIoThreads();
- final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, threadCount,
+ final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, 0,
TaskExecutorFactory.RejectionAction.CALLERRUNS );
final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl(executor );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index d6bbdc5..8079ad9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -363,6 +363,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
continue;
}
+ //TODO, we need to validate the property in the entity matches the property in the unique value
+
+
//else add it to our result set
response.addEntity( expectedUnique.getField(), entity );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index bd3d3e9..4ffabf7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.core.executor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -64,7 +65,13 @@ public class TaskExecutorFactory {
final int maxQueueSize, RejectionAction rejectionAction ) {
- final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
+ final BlockingQueue<Runnable> queue;
+
+ if(maxQueueSize == 0){
+ queue = new SynchronousQueue();
+ }else{
+ queue = new ArrayBlockingQueue<>( maxQueueSize );
+ }
if ( rejectionAction == RejectionAction.ABORT ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7725d90e/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 662370f..85c973e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -28,8 +28,7 @@ import java.util.UUID;
import com.codahale.metrics.Timer;
-import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
-import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ResponseImportTasks;
import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
@@ -121,7 +120,7 @@ public abstract class AbstractService implements Service {
this.sm = sm;
em = sm.getEntityManager();
final Injector injector = sm.getApplicationContext().getBean( Injector.class );
- rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ImportRepair.class)).getAsyncIOScheduler();
+ rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ResponseImportTasks.class ) ).getAsyncIOScheduler();
rxSchedulerFig = injector.getInstance(ServiceSchedulerFig.class );
metricsFactory = injector.getInstance(MetricsFactory.class);
this.entityGetTimer = metricsFactory.getTimer(this.getClass(), "importEntity.get");
[10/15] usergrid git commit: fix alias issues
Posted by to...@apache.org.
fix alias issues
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f13e45b1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f13e45b1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f13e45b1
Branch: refs/heads/master
Commit: f13e45b16561494d460a0f9c8592e319308be049
Parents: c068ab0
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 12:36:52 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 12:36:52 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f13e45b1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e99c052..b5e77c1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -28,7 +28,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.usergrid.persistence.index.impl.IndexingUtils;
+import org.apache.usergrid.persistence.index.impl.*;
+import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,8 +57,6 @@ import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
@@ -571,26 +570,26 @@ public class AmazonAsyncEventService implements AsyncEventService {
* @param indexOperationMessage
*/
private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
- Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
+ final Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
//loop through all adds
- indexOperationMessage.getIndexRequests().stream().forEach(req -> {
- UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ for(IndexOperation req : indexOperationMessage.getIndexRequests()) {
+ final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
if(!apps.containsKey(appId)) {
ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
apps.put(appId,true);
}
- });
+ };
//loop through all deletes
- indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
- UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ for(DeIndexOperation req : indexOperationMessage.getDeIndexRequests()) {
+ final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
if(!apps.containsKey(appId)) {
ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
apps.put(appId,true);
}
- });
+ };
}
[02/15] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/68d38f49
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/68d38f49
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/68d38f49
Branch: refs/heads/master
Commit: 68d38f496e8aeb6481cd6ea620c313fe46c40127
Parents: 419c013 c3a5bc4
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Nov 6 20:30:30 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Nov 6 20:30:30 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/usergrid/corepersistence/index/RxTest.java | 5 +++--
.../org/apache/usergrid/services/AbstractCollectionService.java | 4 +++-
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
[03/15] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7da99c7f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7da99c7f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7da99c7f
Branch: refs/heads/master
Commit: 7da99c7fb69369bfd62cb0fe27f2eac74161f127
Parents: 68d38f4 204bf04
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 11 14:57:00 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 11 14:57:00 2015 -0700
----------------------------------------------------------------------
.../shard/impl/NodeShardAllocationImpl.java | 2 +-
.../persistence/queue/DefaultQueueManager.java | 58 +++++----
.../java-wns/1.2-USERGRID/_remote.repositories | 7 +
...a-wns-1.2-USERGRID-jar-with-dependencies.jar | Bin 0 -> 1836665 bytes
.../java-wns-1.2-USERGRID-javadoc.jar | Bin 0 -> 234289 bytes
.../java-wns-1.2-USERGRID-sources.jar | Bin 0 -> 21654 bytes
.../1.2-USERGRID/java-wns-1.2-USERGRID.jar | Bin 0 -> 33164 bytes
.../1.2-USERGRID/java-wns-1.2-USERGRID.pom | 128 +++++++++++++++++++
stack/pom.xml | 14 +-
stack/rest/pom.xml | 10 ++
stack/services/pom.xml | 10 +-
.../services/notifications/wns/WNSAdapter.java | 5 +-
.../notifications/wns/WNSAdapterTest.java | 49 +++++++
13 files changed, 244 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
[12/15] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a6936431
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a6936431
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a6936431
Branch: refs/heads/master
Commit: a6936431c98e05ec670aa9e4fdfacc63dae6544c
Parents: f840623 b895643
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 14:17:06 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 14:17:06 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 38 +++++++++++++++++---
.../index/impl/DeIndexOperation.java | 4 +--
.../persistence/index/impl/IndexingUtils.java | 21 +++++++++++
.../index/impl/IndexingUtilsTest.java | 36 +++++++++++++++++++
4 files changed, 92 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a6936431/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
[11/15] usergrid git commit: Call foreach on the set directly and
only call createEntityIndex on a unique set of appIds.
Posted by to...@apache.org.
Call foreach on the set directly and only call createEntityIndex on a unique set of appIds.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b8956439
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b8956439
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b8956439
Branch: refs/heads/master
Commit: b89564398e1094203d224a1586103fe9298d12d9
Parents: f13e45b
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Nov 12 11:39:17 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Nov 12 11:39:17 2015 -0800
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 34 +++++++++++---------
1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b8956439/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b5e77c1..e3b60a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -570,26 +570,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
* @param indexOperationMessage
*/
private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
- final Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
- //loop through all adds
- for(IndexOperation req : indexOperationMessage.getIndexRequests()) {
- final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
- if(!apps.containsKey(appId)) {
- ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
- entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
- apps.put(appId,true);
- }
- };
- //loop through all deletes
- for(DeIndexOperation req : indexOperationMessage.getDeIndexRequests()) {
- final UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
- if(!apps.containsKey(appId)) {
+ // create a set so we can have a unique list of appIds for which we call createEntityIndex
+ Set<UUID> appIds = new HashSet<>();
+
+ // loop through all indexRequests and add the appIds to the set
+ indexOperationMessage.getIndexRequests().forEach(req -> {
+ UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ appIds.add(appId);
+ });
+
+ // loop through all deindexRequests and add the appIds to the set
+ indexOperationMessage.getDeIndexRequests().forEach(req -> {
+ UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ appIds.add(appId);
+ });
+
+ // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
+ appIds.forEach(appId -> {
ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
- apps.put(appId,true);
}
- };
+ );
}
[09/15] usergrid git commit: fix alias issues
Posted by to...@apache.org.
fix alias issues
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c068ab0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c068ab0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c068ab0d
Branch: refs/heads/master
Commit: c068ab0d471bf8bf86dff2a9d3d30ae1baa0f24d
Parents: 3005331
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 12:34:46 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 12:34:46 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 19 ++++++++++++-------
1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c068ab0d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index a4b7ef7..e99c052 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.io.IOException;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -573,18 +571,25 @@ public class AmazonAsyncEventService implements AsyncEventService {
* @param indexOperationMessage
*/
private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+ Map<UUID,Boolean> apps = new HashMap<>(indexOperationMessage.getIndexRequests().size()+indexOperationMessage.getDeIndexRequests().size());
//loop through all adds
indexOperationMessage.getIndexRequests().stream().forEach(req -> {
UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
- ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
- entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ if(!apps.containsKey(appId)) {
+ ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ apps.put(appId,true);
+ }
});
//loop through all deletes
indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
- ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
- entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ if(!apps.containsKey(appId)) {
+ ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ apps.put(appId,true);
+ }
});
}
[08/15] usergrid git commit: fix alias issues
Posted by to...@apache.org.
fix alias issues
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/30053318
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/30053318
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/30053318
Branch: refs/heads/master
Commit: 30053318fd848cf777d76af77c90edc474a579ee
Parents: fec6520
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Nov 12 12:10:07 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Nov 12 12:10:07 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/30053318/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 77777c2..a4b7ef7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -551,7 +551,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
- checkInitialize(indexOperationMessage);
+ initializeEntityIndexes(indexOperationMessage);
//NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message
//so we'll let compaction on column expiration handle deletion
@@ -567,13 +567,20 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- private void checkInitialize(final IndexOperationMessage indexOperationMessage) {
+ /**
+ * this method will call initialize for each message, since we are caching the entity indexes,
+ * we don't worry about aggregating by app id
+ * @param indexOperationMessage
+ */
+ private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+ //loop through all adds
indexOperationMessage.getIndexRequests().stream().forEach(req -> {
UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
});
+ //loop through all deletes
indexOperationMessage.getDeIndexRequests().stream().forEach(req -> {
UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
[07/15] usergrid git commit: Address fixes found during review
Posted by to...@apache.org.
Address fixes found during review
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f840623d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f840623d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f840623d
Branch: refs/heads/master
Commit: f840623df82131429cfd564e0ad4fdeac356a93d
Parents: 7725d90
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 11:59:53 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:59:53 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 6 +++---
.../service/ServiceSchedulerFig.java | 22 ++++++++++++++++++--
.../usergrid/services/AbstractService.java | 2 +-
3 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f840623d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 650bb4d..5d2d8dc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -240,10 +240,10 @@ public class CoreModule extends AbstractModule {
@Inject
@ResponseImportTasks
@Singleton
- public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+ public RxTaskScheduler getImportRepairScheduler( final ServiceSchedulerFig serviceSchedulerFig ) {
- final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
- final int threadCount = asyncEventsSchedulerFig.getMaxImportThreads();
+ final String poolName = serviceSchedulerFig.getRepairPoolName();
+ final int threadCount = serviceSchedulerFig.getImportThreadPoolSize();
final ThreadPoolExecutor executor = TaskExecutorFactory
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f840623d/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
index ddaa01c..e585ee3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -37,10 +37,28 @@ public interface ServiceSchedulerFig extends GuicyFig {
String SERVICE_IMPORT_THREADS = "service.import.threads";
+ String SERVICE_IMPORT_POOL_NAME = "service.import.name";
- @Default("20")
+ String SERVICE_IMPORT_CONCURRENCY = "service.import.concurrency";
+
+
+
+
+ @Default( "Usergrid-Import-Pool" )
+ @Key( SERVICE_IMPORT_POOL_NAME )
+ String getRepairPoolName();
+
+
+
+ @Default("100")
@Key( SERVICE_IMPORT_THREADS)
- int getImportThreads();
+ int getImportThreadPoolSize();
+
+
+
+ @Default("20")
+ @Key( SERVICE_IMPORT_CONCURRENCY)
+ int getImportConcurrency();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f840623d/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index 85c973e..3887f92 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -488,7 +488,7 @@ public abstract class AbstractService implements Service {
throw new RuntimeException(e);
}
}).subscribeOn(rxScheduler);
- }, rxSchedulerFig.getImportThreads());
+ }, rxSchedulerFig.getImportConcurrency());
ObservableTimer.time(tuplesObservable, entitiesParallelGetTimer).toBlocking().lastOrDefault(null);
}
[14/15] usergrid git commit: Merge branch '2.1-release' of
https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Posted by to...@apache.org.
Merge branch '2.1-release' of https://git-wip-us.apache.org/repos/asf/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6f61b05d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6f61b05d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6f61b05d
Branch: refs/heads/master
Commit: 6f61b05dd0531b54637be7e9c407f3058aa98006
Parents: a693643 63f49ba
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 12 14:21:54 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 14:21:54 2015 -0700
----------------------------------------------------------------------
----------------------------------------------------------------------