You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/11/18 17:27:29 UTC
[08/50] usergrid git commit: Refactored schedulers to have separate
schedulers for different tasks
Refactored schedulers to have separate schedulers for different tasks
Also fixes a bug with unique values. Values are now validated on read to ensure that unique value is still valid.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0e1f0e64
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0e1f0e64
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0e1f0e64
Branch: refs/heads/USERGRID-872
Commit: 0e1f0e64176ce13b579762c42d8358eaa7543f20
Parents: 7da99c7
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 11 16:55:00 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 12 11:27:16 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 130 ++++++++++++++-----
.../asyncevents/AmazonAsyncEventService.java | 1 +
.../asyncevents/AsyncEventsSchedulerFig.java | 94 ++++++++++++++
.../asyncevents/AsyncIndexProvider.java | 2 +-
.../asyncevents/EventExecutionScheduler.java | 37 ++++++
.../traverse/ReadGraphCollectionFilter.java | 3 +-
.../traverse/ReadGraphConnectionFilter.java | 3 +-
.../corepersistence/rx/impl/AsyncRepair.java | 38 ++++++
.../corepersistence/rx/impl/ImportRepair.java | 38 ++++++
.../service/ServiceSchedulerFig.java | 48 +++++++
.../collection/guice/CollectionModule.java | 30 +++++
.../guice/CollectionTaskExecutor.java | 35 -----
.../EntityCollectionManagerFactoryImpl.java | 5 +-
.../impl/EntityCollectionManagerImpl.java | 16 ++-
.../mvcc/stage/write/WriteCommit.java | 4 +-
.../scheduler/CollectionExecutorScheduler.java | 52 ++++++++
.../scheduler/CollectionSchedulerFig.java | 53 ++++++++
.../collection/EntityCollectionManagerIT.java | 53 +++++---
.../core/executor/TaskExecutorFactory.java | 101 ++++++++++----
.../persistence/core/guice/CommonModule.java | 25 ++--
.../persistence/core/rx/RxSchedulerFig.java | 71 ----------
.../core/rx/RxTaskSchedulerImpl.java | 81 +-----------
.../usergrid/services/AbstractService.java | 12 +-
23 files changed, 639 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 959edec..09db151 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,12 +16,16 @@
package org.apache.usergrid.corepersistence;
+import java.util.concurrent.ThreadPoolExecutor;
+
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventsSchedulerFig;
import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
+import org.apache.usergrid.corepersistence.asyncevents.EventExecutionScheduler;
import org.apache.usergrid.corepersistence.index.ApplicationIndexBucketLocator;
import org.apache.usergrid.corepersistence.index.CoreIndexFig;
import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
@@ -42,6 +46,8 @@ import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl;
import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
import org.apache.usergrid.corepersistence.service.AggregationService;
import org.apache.usergrid.corepersistence.service.AggregationServiceFactory;
import org.apache.usergrid.corepersistence.service.AggregationServiceImpl;
@@ -51,20 +57,26 @@ import org.apache.usergrid.corepersistence.service.CollectionService;
import org.apache.usergrid.corepersistence.service.CollectionServiceImpl;
import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
import org.apache.usergrid.corepersistence.service.StatusService;
import org.apache.usergrid.corepersistence.service.StatusServiceImpl;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
import org.apache.usergrid.persistence.index.guice.IndexModule;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
@@ -73,25 +85,22 @@ import com.google.inject.multibindings.Multibinder;
/**
* Guice Module that encapsulates Core Persistence.
*/
-public class CoreModule extends AbstractModule {
-
-
+public class CoreModule extends AbstractModule {
@Override
protected void configure() {
- install( new CommonModule());
+ install( new CommonModule() );
install( new CollectionModule() {
/**
* configure our migration data provider for all entities in the system
*/
@Override
- public void configureMigrationProvider() {
+ public void configureMigrationProvider() {
- bind(new TypeLiteral<MigrationDataProvider<EntityIdScope>>(){}).to(
- AllEntitiesInSystemImpl.class );
- }
+ bind( new TypeLiteral<MigrationDataProvider<EntityIdScope>>() {} ).to( AllEntitiesInSystemImpl.class );
+ }
} );
install( new GraphModule() {
@@ -100,30 +109,28 @@ public class CoreModule extends AbstractModule {
*/
@Override
public void configureMigrationProvider() {
- bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
- AllNodesInGraphImpl.class );
+ bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to( AllNodesInGraphImpl.class );
}
} );
- install(new IndexModule(){
+ install( new IndexModule() {
@Override
public void configureMigrationProvider() {
- bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ).to(
- AllApplicationsObservableImpl.class );
+ bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} )
+ .to( AllApplicationsObservableImpl.class );
}
- });
- // install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
- // install(new QueueModule());
+ } );
+ // install(new MapModule()); TODO, re-enable when index module doesn't depend on queue
+ // install(new QueueModule());
- bind(ManagerCache.class).to( CpManagerCache.class );
- bind(ApplicationIdCacheFactory.class);
+ bind( ManagerCache.class ).to( CpManagerCache.class );
+ bind( ApplicationIdCacheFactory.class );
/**
* Create our migrations for within our core plugin
*/
Multibinder<DataMigration> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(),
- new TypeLiteral<DataMigration>() {}, CoreMigration.class );
+ Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration>() {}, CoreMigration.class );
dataMigrationMultibinder.addBinding().to( DeDupConnectionDataMigration.class );
@@ -135,7 +142,7 @@ public class CoreModule extends AbstractModule {
plugins.addBinding().to( MigrationModuleVersionPlugin.class );
bind( AllApplicationsObservable.class ).to( AllApplicationsObservableImpl.class );
- bind( AllEntityIdsObservable.class).to( AllEntityIdsObservableImpl.class );
+ bind( AllEntityIdsObservable.class ).to( AllEntityIdsObservableImpl.class );
/*****
@@ -143,50 +150,103 @@ public class CoreModule extends AbstractModule {
*****/
- bind( IndexService.class ).to(IndexServiceImpl.class);
+ bind( IndexService.class ).to( IndexServiceImpl.class );
//bind the event handlers
- bind( EventBuilder.class).to( EventBuilderImpl.class );
- bind(ApplicationIndexBucketLocator.class);
+ bind( EventBuilder.class ).to( EventBuilderImpl.class );
+ bind( ApplicationIndexBucketLocator.class );
//bind the queue provider
bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
- bind( ReIndexService.class).to(ReIndexServiceImpl.class);
+ bind( ReIndexService.class ).to( ReIndexServiceImpl.class );
- install(new FactoryModuleBuilder()
- .implement(AggregationService.class, AggregationServiceImpl.class)
- .build(AggregationServiceFactory.class));
+ install( new FactoryModuleBuilder().implement( AggregationService.class, AggregationServiceImpl.class )
+ .build( AggregationServiceFactory.class ) );
- bind(IndexLocationStrategyFactory.class).to( IndexLocationStrategyFactoryImpl.class );
+ bind( IndexLocationStrategyFactory.class ).to( IndexLocationStrategyFactoryImpl.class );
- install(new GuicyFigModule(IndexProcessorFig.class));
-
- install(new GuicyFigModule(CoreIndexFig.class));
+ install( new GuicyFigModule( IndexProcessorFig.class ) );
+ install( new GuicyFigModule( CoreIndexFig.class ) );
install( new GuicyFigModule( ApplicationIdCacheFig.class ) );
install( new GuicyFigModule( EntityManagerFig.class ) );
+ install( new GuicyFigModule( AsyncEventsSchedulerFig.class ) );
+
+ install( new GuicyFigModule( ServiceSchedulerFig.class ) );
+
//install our pipeline modules
- install(new PipelineModule());
+ install( new PipelineModule() );
/**
* Install our service operations
*/
- bind( CollectionService.class).to( CollectionServiceImpl.class );
+ bind( CollectionService.class ).to( CollectionServiceImpl.class );
- bind( ConnectionService.class).to( ConnectionServiceImpl.class);
+ bind( ConnectionService.class ).to( ConnectionServiceImpl.class );
bind( ApplicationService.class ).to( ApplicationServiceImpl.class );
bind( StatusService.class ).to( StatusServiceImpl.class );
+ }
+
+
+ @Provides
+ @Inject
+ @EventExecutionScheduler
+ public RxTaskScheduler getSqsTaskScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+ final String poolName = asyncEventsSchedulerFig.getIoSchedulerName();
+ final int threadCount = asyncEventsSchedulerFig.getMaxIoThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory
+ .createTaskExecutor( poolName, threadCount, threadCount, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+ return taskScheduler;
}
+
+ @Provides
+ @Inject
+ @AsyncRepair
+ public RxTaskScheduler getAsyncRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+ final String poolName = asyncEventsSchedulerFig.getRepairPoolName();
+ final int threadCount = asyncEventsSchedulerFig.getMaxRepairThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory
+ .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.DROP );
+
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+ return taskScheduler;
+ }
+
+
+ @Provides
+ @Inject
+ @ImportRepair
+ public RxTaskScheduler getImportRepairScheduler( final AsyncEventsSchedulerFig asyncEventsSchedulerFig ) {
+
+ final String poolName = asyncEventsSchedulerFig.getImportSchedulerName();
+ final int threadCount = asyncEventsSchedulerFig.getMaxImportThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory
+ .createTaskExecutor( poolName, threadCount, 1, TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl( executor );
+
+ return taskScheduler;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 16e119c..24ec51f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -153,6 +153,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final EventBuilder eventBuilder,
final MapManagerFactory mapManagerFactory,
final QueueFig queueFig,
+ @EventExecutionScheduler
final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
new file mode 100644
index 0000000..83eb02e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface AsyncEventsSchedulerFig extends GuicyFig {
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String IO_SCHEDULER_THREADS = "scheduler.io.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String IO_SCHEDULER_NAME = "scheduler.io.poolName";
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String REPAIR_SCHEDULER_THREADS = "repair.io.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String REPAIR_SCHEDULER_NAME = "repair.io.poolName";
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String IMPORT_SCHEDULER_THREADS = "import.io.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String IMPORT_SCHEDULER_NAME = "import.io.poolName";
+
+
+ @Default( "100" )
+ @Key( IO_SCHEDULER_THREADS )
+ int getMaxIoThreads();
+
+ @Default( "Usergrid-SQS-Pool" )
+ @Key( IO_SCHEDULER_NAME )
+ String getIoSchedulerName();
+
+
+ @Default( "20" )
+ @Key( REPAIR_SCHEDULER_THREADS )
+ int getMaxRepairThreads();
+
+ @Default( "Usergrid-Repair-Pool" )
+ @Key( REPAIR_SCHEDULER_NAME )
+ String getRepairPoolName();
+
+ @Default( "100" )
+ @Key( IMPORT_SCHEDULER_THREADS )
+ int getMaxImportThreads();
+
+ @Default( "Usergrid-Import-Pool" )
+ @Key( IMPORT_SCHEDULER_NAME )
+ String getImportSchedulerName();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 2bace8d..d65cffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -62,7 +62,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
final QueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory,
- final RxTaskScheduler rxTaskScheduler,
+ @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EventBuilder eventBuilder,
final IndexLocationStrategyFactory indexLocationStrategyFactory,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
new file mode 100644
index 0000000..ce09aae
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventExecutionScheduler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the event execution scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface EventExecutionScheduler {}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
index 3d7df3b..3819659 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphCollectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -44,7 +45,7 @@ public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
*/
@Inject
public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory,
- final RxTaskScheduler rxTaskScheduler,
+ @AsyncRepair final RxTaskScheduler rxTaskScheduler,
final EventBuilder eventBuilder,
final AsyncEventService asyncEventService,
@Assisted final String collectionName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
index b2d368b..3c92c03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/ReadGraphConnectionFilter.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.pipeline.read.traverse;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -44,7 +45,7 @@ public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
*/
@Inject
public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory,
- final RxTaskScheduler rxTaskScheduler,
+ @AsyncRepair final RxTaskScheduler rxTaskScheduler,
final EventBuilder eventBuilder,
final AsyncEventService asyncEventService,
@Assisted final String connectionName ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
new file mode 100644
index 0000000..aa2cc12
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AsyncRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface AsyncRepair {
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
new file mode 100644
index 0000000..d65d04c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ImportRepair.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+/**
+ * Label for using the async repair scheduler
+ */
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface ImportRepair {
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
new file mode 100644
index 0000000..ddaa01c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/service/ServiceSchedulerFig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.service;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface ServiceSchedulerFig extends GuicyFig {
+
+
+ /**
+ * The number of threads to use when importing entities into result sets
+ */
+ String SERVICE_IMPORT_THREADS = "service.import.threads";
+
+
+
+ @Default("20")
+ @Key( SERVICE_IMPORT_THREADS)
+ int getImportThreads();
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 78c7f37..0a6e270 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.collection.guice;
+import java.util.concurrent.ThreadPoolExecutor;
+
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -25,11 +27,18 @@ import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionSchedulerFig;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provides;
/**
@@ -45,6 +54,7 @@ public abstract class CollectionModule extends AbstractModule {
// noinspection unchecked
install( new GuicyFigModule( SerializationFig.class ) );
+ install( new GuicyFigModule( CollectionSchedulerFig.class ) );
install( new SerializationModule() );
install( new ServiceModule() );
@@ -62,6 +72,26 @@ public abstract class CollectionModule extends AbstractModule {
}
+
+
+ @Provides
+ @Inject
+ @CollectionExecutorScheduler
+ public RxTaskScheduler getRxTaskScheduler( final CollectionSchedulerFig collectionSchedulerFig ){
+
+ final String poolName = collectionSchedulerFig.getIoSchedulerName();
+ final int threadCount = collectionSchedulerFig.getMaxIoThreads();
+
+
+ final ThreadPoolExecutor executor = TaskExecutorFactory.createTaskExecutor( poolName, threadCount, threadCount,
+ TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
+ final RxTaskScheduler taskScheduler = new RxTaskSchedulerImpl(executor );
+
+ return taskScheduler;
+ }
+
+
/**
* Gives callers the ability to to configure an instance of
*
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
deleted file mode 100644
index 53c1f48..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionTaskExecutor.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-
-@BindingAnnotation
-@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
-public @interface CollectionTaskExecutor {}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 45cee06..a52ee9c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.scheduler.CollectionExecutorScheduler;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
@@ -74,7 +75,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace;
- private final EntityCacheFig entityCacheFig;
private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler;
@@ -107,7 +107,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
final Keyspace keyspace, final EntityCacheFig entityCacheFig,
- MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
+ final MetricsFactory metricsFactory, @CollectionExecutorScheduler final RxTaskScheduler rxTaskScheduler ) {
this.writeStart = writeStart;
this.writeVerifyUnique = writeVerifyUnique;
@@ -123,7 +123,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
this.keyspace = keyspace;
- this.entityCacheFig = entityCacheFig;
this.metricsFactory = metricsFactory;
this.rxTaskScheduler = rxTaskScheduler;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index cb1515c..d6bbdc5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -63,6 +63,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.codahale.metrics.Timer;
@@ -117,7 +118,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final Timer deleteTimer;
private final Timer fieldIdTimer;
private final Timer fieldEntityTimer;
- private final Timer updateTimer;
private final Timer loadTimer;
private final Timer getLatestTimer;
@@ -165,7 +165,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.delete");
this.fieldIdTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldId");
this.fieldEntityTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.fieldEntity");
- this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.update");
this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.load");
this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "base.latest");
}
@@ -188,8 +187,14 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
- final Observable<Entity> write = observable.map( writeCommit ).compose( uniqueCleanup )
- //now extract the ioEvent we need to return
+ final Observable<Entity> write = observable.map( writeCommit )
+ .map(ioEvent -> {
+ //fire this in the background so we don't block writes
+ Observable.just( ioEvent ).compose( uniqueCleanup ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+ return ioEvent;
+ }
+ )
+ //now extract the ioEvent we need to return and update the version
.map( ioEvent -> ioEvent.getEvent().getEntity().get() );
return ObservableTimer.time( write, writeTimer );
@@ -358,7 +363,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
continue;
}
-
//else add it to our result set
response.addEntity( expectedUnique.getField(), entity );
}
@@ -380,6 +384,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
}
+
+
// fire the stages
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 9b1a393..fe3f9a9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -94,7 +94,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
final ApplicationScope applicationScope = ioEvent.getEntityCollection();
//set the version into the entity
- EntityUtils.setVersion( mvccEntity.getEntity().get(), version );
+ final Entity entity = mvccEntity.getEntity().get();
+
+ EntityUtils.setVersion( entity, version );
MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() );
ValidationUtils.verifyTimeUuid( version ,"version" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
new file mode 100644
index 0000000..8f8aa00
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionExecutorScheduler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface CollectionExecutorScheduler {}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
new file mode 100644
index 0000000..daefa9b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/scheduler/CollectionSchedulerFig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.scheduler;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ *
+ */
+@FigSingleton
+public interface CollectionSchedulerFig extends GuicyFig {
+
+
+ /**
+ * Amount of threads to use in async processing
+ */
+ String COLLECTION_SCHEDULER_THREADS = "scheduler.collection.threads";
+
+
+ /**
+ * Name of pool to use when performing scheduling
+ */
+ String COLLECTION_SCHEDULER_NAME = "scheduler.collection.poolName";
+
+
+ @Default( "20" )
+ @Key( COLLECTION_SCHEDULER_THREADS )
+ int getMaxIoThreads();
+
+ @Default( "Usergrid-Collection-Pool" )
+ @Key( COLLECTION_SCHEDULER_NAME )
+ String getIoSchedulerName();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
index e6c6909..115be99 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerIT.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.collection;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -58,6 +59,7 @@ import rx.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -315,18 +317,17 @@ public class EntityCollectionManagerIT {
@Test
- public void writeAndGetField2X() {
-
-
+ public void writeAndGetField2X() throws InterruptedException {
ApplicationScope collectionScope1 = new ApplicationScopeImpl( new SimpleId( "organization" ) );
- Entity newEntity = new Entity( new SimpleId( "test" ) );
- Field field = new StringField( "testField", "unique", true );
- newEntity.setField( field );
+ final Id entityId = new SimpleId( "test" );
+ Entity firstInstance = new Entity( entityId );
+ Field firstField = new StringField( "testField", "unique", true );
+ firstInstance.setField( firstField );
EntityCollectionManager manager = factory.createCollectionManager( collectionScope1 );
- Observable<Entity> observable = manager.write( newEntity );
+ Observable<Entity> observable = manager.write( firstInstance );
Entity createReturned = observable.toBlocking().lastOrDefault( null );
@@ -334,21 +335,22 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturned.getId() );
assertNotNull( "Version was assigned", createReturned.getVersion() );
- Id id = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
- assertNotNull( id );
- assertEquals( newEntity.getId(), id );
+ final Id existingId = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+ assertNotNull( existingId );
+ assertEquals( firstInstance.getId(), existingId );
Field fieldNull = new StringField( "testFieldNotThere", "uniquely", true );
- id = manager.getIdField( newEntity.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
- assertNull( id );
+ final Id noId = manager.getIdField( firstInstance.getId().getType(), fieldNull ).toBlocking().lastOrDefault( null );
+ assertNull( noId );
//ensure we clean up
- Field fieldSecond = new StringField( "testField", "unique2", true );
- newEntity.setField( fieldSecond );
+ Entity secondInstance = new Entity( entityId );
+ Field secondField = new StringField( firstField.getName(), "unique2", true );
+ secondInstance.setField( secondField );
- Observable<Entity> observableSecond = manager.write( newEntity );
+ Observable<Entity> observableSecond = manager.write( secondInstance );
Entity createReturnedSecond = observableSecond.toBlocking().lastOrDefault( null );
@@ -356,16 +358,27 @@ public class EntityCollectionManagerIT {
assertNotNull( "Id was assigned", createReturnedSecond.getId() );
assertNotNull( "Version was assigned", createReturnedSecond.getVersion() );
- Id idFirst = manager.getIdField( newEntity.getId().getType(), field ).toBlocking().lastOrDefault( null );
+ assertNotEquals( "Versions should not be equal", createReturned.getVersion(), createReturnedSecond.getVersion() );
- assertNull(idFirst);
+ //sanity check, get the entity to ensure it's the right version
- Id idSecond = manager.getIdField( newEntity.getId().getType(), fieldSecond ).toBlocking().lastOrDefault( null );
+ final Entity loadedVersion = manager.load( entityId ).toBlocking().last();
- assertNotNull( idSecond );
- assertEquals( newEntity.getId(), idSecond );
+ assertEquals(entityId, loadedVersion.getId());
+ assertEquals(createReturnedSecond.getVersion(), loadedVersion.getVersion());
+ //give clean time to run. need to finish the todo below
+ Thread.sleep( 2000 );
+ //TODO, we need to implement verify and repair on this
+ final Id idFirst = manager.getIdField( firstInstance.getId().getType(), firstField ).toBlocking().lastOrDefault( null );
+ assertNull(idFirst);
+
+
+ final Id idSecond = manager.getIdField( secondInstance.getId().getType(), secondField ).toBlocking().lastOrDefault( null );
+
+ assertNotNull( idSecond );
+ assertEquals( secondInstance.getId(), idSecond );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
index 3c6a750..bd3d3e9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -20,30 +20,45 @@
package org.apache.usergrid.persistence.core.executor;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* A task executor that allows you to submit tasks
*/
public class TaskExecutorFactory {
- private static final Logger log = LoggerFactory.getLogger(TaskExecutorFactory.class);
+ private static final Logger log = LoggerFactory.getLogger( TaskExecutorFactory.class );
+
public enum RejectionAction {
+ /**
+ * If there is no capacity left, throw an exception
+ */
ABORT,
- CALLERRUNS
+ /**
+ * If there is no capacity left, the caller runs the callable
+ */
+ CALLERRUNS,
+
+ /**
+ * If there is no capacity left, the request is logged and then silently dropped
+ */
+ DROP
}
+
+
/**
* Create a task executor
- * @param schedulerName
- * @param maxThreadCount
- * @param maxQueueSize
- * @return
*/
public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
final int maxQueueSize, RejectionAction rejectionAction ) {
@@ -52,22 +67,22 @@ public class TaskExecutorFactory {
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
- if(rejectionAction.equals(RejectionAction.ABORT)){
-
+ if ( rejectionAction == RejectionAction.ABORT ) {
return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
-
}
- else if(rejectionAction.equals(RejectionAction.CALLERRUNS)){
+ else if ( rejectionAction == RejectionAction.CALLERRUNS ) {
return new MaxSizeThreadPoolCallerRuns( queue, schedulerName, maxThreadCount );
-
- }else{
- //default to the thread pool with ABORT policy
- return new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
}
-
+ else if ( rejectionAction == RejectionAction.DROP ) {
+ return new MaxSizeThreadPoolDrops( queue, schedulerName, maxThreadCount );
+ }
+ else {
+ throw new IllegalArgumentException( "Unable to create a scheduler with the arguments provided" );
+ }
}
+
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
@@ -78,14 +93,29 @@ public class TaskExecutorFactory {
}
}
+
/**
* Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
*/
private static final class MaxSizeThreadPoolCallerRuns extends ThreadPoolExecutor {
- public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
- super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue,
- new CountingThreadFactory( poolName ), new RejectedHandler(poolName) );
+ public MaxSizeThreadPoolCallerRuns( final BlockingQueue<Runnable> queue, final String poolName,
+ final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+ new CallerRunsHandler( poolName ) );
+ }
+ }
+
+
+ /**
+ * Create a thread pool that will implement CallerRunsPolicy if our tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPoolDrops extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPoolDrops( final BlockingQueue<Runnable> queue, final String poolName,
+ final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ),
+ new DropHandler( poolName ) );
}
}
@@ -111,29 +141,50 @@ public class TaskExecutorFactory {
Thread t = new Thread( r, threadName );
//set it to be a daemon thread so it doesn't block shutdown
- t.setDaemon(true);
+ t.setDaemon( true );
return t;
}
}
+
/**
* The handler that will handle rejected executions and signal the interface
*/
- private static final class RejectedHandler implements RejectedExecutionHandler {
+ private static final class CallerRunsHandler implements RejectedExecutionHandler {
private final String poolName;
- private RejectedHandler (final String poolName) {this.poolName = poolName;}
+
+ private CallerRunsHandler( final String poolName ) {this.poolName = poolName;}
+
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
+ log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r,
+ Thread.currentThread().getName() );
//We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
r.run();
}
+ }
+
+
+ /**
+ * The handler that will handle rejected executions and signal the interface
+ */
+ private static final class DropHandler implements RejectedExecutionHandler {
+
+ private final String poolName;
+
+ private DropHandler( final String poolName ) {this.poolName = poolName;}
+
+
+ @Override
+ public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
+ log.warn( "{} task queue full, dropping task {}", poolName, r );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index b93ba76..75e2b29 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,11 +19,6 @@
package org.apache.usergrid.persistence.core.guice;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
-import org.apache.usergrid.persistence.core.metrics.MetricsFig;
-import org.apache.usergrid.persistence.core.migration.data.*;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.astyanax.AstyanaxKeyspaceProvider;
@@ -32,14 +27,21 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactoryImpl;
+import org.apache.usergrid.persistence.core.metrics.MetricsFig;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCache;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoCacheImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerImpl;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.rx.RxTaskSchedulerImpl;
import com.google.inject.AbstractModule;
import com.google.inject.Key;
@@ -91,16 +93,11 @@ public class CommonModule extends AbstractModule {
Multibinder.newSetBinder(binder(), MigrationPlugin.class);
- /**
- * RX java scheduler configuration
- */
-
- install(new GuicyFigModule(RxSchedulerFig.class));
install(new GuicyFigModule(ClusterFig.class));
bind(SettingsValidationCluster.class).asEagerSingleton(); //validate props from ClusterFig on startup
- bind(RxTaskScheduler.class).to(RxTaskSchedulerImpl.class);
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
deleted file mode 100644
index 4511518..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxSchedulerFig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- *
- */
-@FigSingleton
-public interface RxSchedulerFig extends GuicyFig {
-
-
- /**
- * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple
- * backpressure
- */
- String IO_SCHEDULER_THREADS = "scheduler.io.threads";
-
-
- /**
- * Amount of time in milliseconds to wait when ES rejects our request before retrying. Provides simple
- * backpressure
- */
- String IO_SCHEDULER_NAME = "scheduler.io.poolName";
-
- /**
- * The number of threads to use when importing entities into result sets
- */
- String IO_IMPORT_THREADS = "scheduler.import.threads";
-
-
-
-
- @Default( "100" )
- @Key( IO_SCHEDULER_THREADS )
- int getMaxIoThreads();
-
- @Default( "Usergrid-RxIOPool" )
- @Key(IO_SCHEDULER_NAME)
- String getIoSchedulerName();
-
- @Default("20")
- @Key( IO_IMPORT_THREADS)
- int getImportThreads();
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
index dce46cb..261cbeb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskSchedulerImpl.java
@@ -20,18 +20,9 @@
package org.apache.usergrid.persistence.core.rx;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -42,29 +33,17 @@ import rx.schedulers.Schedulers;
/**
* An implementation of the task scheduler that allows us to control the number of I/O threads
*/
-@Singleton
public class RxTaskSchedulerImpl implements RxTaskScheduler {
- private static final Logger log = LoggerFactory.getLogger( RxTaskSchedulerImpl.class );
-
private final Scheduler scheduler;
- private final String poolName;
@Inject
- public RxTaskSchedulerImpl(final RxSchedulerFig schedulerFig){
-
- this.poolName = schedulerFig.getIoSchedulerName();
-
- final int poolSize = schedulerFig.getMaxIoThreads();
-
-
- final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(poolSize);
-
+ public RxTaskSchedulerImpl(final ThreadPoolExecutor executor){
- final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, poolSize );
+ Preconditions.checkNotNull( executor , "executor must not be null");
- this.scheduler = Schedulers.from(threadPool);
+ this.scheduler = Schedulers.from(executor);
}
@@ -76,56 +55,4 @@ public class RxTaskSchedulerImpl implements RxTaskScheduler {
}
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final int maxPoolSize ) {
-
- super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ), new RejectedHandler() );
- }
- }
-
-
- /**
- * Thread factory that will name and count threads for easier debugging
- */
- private final class CountingThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- final String threadName = poolName + "-" + newValue;
-
- Thread t = new Thread( r, threadName );
-
- //set it to be a daemon thread so it doesn't block shutdown
- t.setDaemon( true );
-
- return t;
- }
- }
-
-
- /**
- * The handler that will handle rejected executions and signal the interface
- */
- private final class RejectedHandler implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- log.warn( "{} task queue full, rejecting task {} and running in thread {}", poolName, r, Thread.currentThread().getName() );
-
- //We've decided we want to have a "caller runs" policy, to just invoke the task when rejected
-
- r.run();
- }
-
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0e1f0e64/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
index d032589..662370f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/AbstractService.java
@@ -27,6 +27,10 @@ import java.util.Set;
import java.util.UUID;
import com.codahale.metrics.Timer;
+
+import org.apache.usergrid.corepersistence.rx.impl.AsyncRepair;
+import org.apache.usergrid.corepersistence.rx.impl.ImportRepair;
+import org.apache.usergrid.corepersistence.service.ServiceSchedulerFig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.slf4j.Logger;
@@ -42,7 +46,6 @@ import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.Schema;
-import org.apache.usergrid.persistence.core.rx.RxSchedulerFig;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.security.shiro.utils.SubjectUtils;
import org.apache.usergrid.services.ServiceParameter.IdParameter;
@@ -54,6 +57,7 @@ import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import org.apache.usergrid.services.exceptions.UnsupportedServiceOperationException;
import com.google.inject.Injector;
+import com.google.inject.Key;
import rx.Observable;
import rx.Scheduler;
@@ -100,7 +104,7 @@ public abstract class AbstractService implements Service {
protected Map<String, Object> defaultEntityMetadata;
private Scheduler rxScheduler;
- private RxSchedulerFig rxSchedulerFig;
+ private ServiceSchedulerFig rxSchedulerFig;
private MetricsFactory metricsFactory;
private Timer entityGetTimer;
private Timer entitiesGetTimer;
@@ -117,8 +121,8 @@ public abstract class AbstractService implements Service {
this.sm = sm;
em = sm.getEntityManager();
final Injector injector = sm.getApplicationContext().getBean( Injector.class );
- rxScheduler = injector.getInstance( RxTaskScheduler.class ).getAsyncIOScheduler();
- rxSchedulerFig = injector.getInstance(RxSchedulerFig.class);
+ rxScheduler = injector.getInstance( Key.get(RxTaskScheduler.class, ImportRepair.class)).getAsyncIOScheduler();
+ rxSchedulerFig = injector.getInstance(ServiceSchedulerFig.class );
metricsFactory = injector.getInstance(MetricsFactory.class);
this.entityGetTimer = metricsFactory.getTimer(this.getClass(), "importEntity.get");
this.entitiesGetTimer = metricsFactory.getTimer(this.getClass(), "importEntities.get");