You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/11 03:03:09 UTC

incubator-usergrid git commit: Refactor of InMemoryAsyncEventService into EventBuilderImpl. All asyc scheduling mechanisms, will need to invoke this central logic flow

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 2e5937b03 -> 3f64b29a4


Refactor of InMemoryAsyncEventService into EventBuilderImpl.  All asyc scheduling mechanisms, will need to invoke this central logic flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f64b29a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f64b29a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f64b29a

Branch: refs/heads/two-dot-o-dev
Commit: 3f64b29a44a7e192b5866ce35565b91ea6a838f8
Parents: 2e5937b
Author: Todd Nine <tn...@apigee.com>
Authored: Sun May 10 19:02:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 19:02:57 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   9 +-
 .../asyncevents/AsyncIndexProvider.java         |   8 +-
 .../asyncevents/EventBuilder.java               | 105 +++++++++++++
 .../asyncevents/EventBuilderImpl.java           | 154 +++++++++++++++++++
 .../asyncevents/InMemoryAsyncEventService.java  |  59 +++----
 .../index/InMemoryAsycIndexServiceTest.java     |   5 +-
 6 files changed, 293 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index f1e1596..a02bffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -20,6 +20,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
@@ -132,9 +134,14 @@ public class CoreModule  extends AbstractModule {
 
 
         bind( IndexService.class ).to( IndexServiceImpl.class );
+
+        //bind the event handlers
+        bind( EventBuilder.class).to( EventBuilderImpl.class );
+
         //bind the queue provider
+        bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
+
 
-        bind( AsyncEventService.class).toProvider( AsyncIndexProvider.class );
 
         install( new GuicyFigModule( IndexProcessorFig.class ) );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 9f801b4..ec968af 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -45,6 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final IndexService indexService;
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EventBuilder eventBuilder;
 
     private AsyncEventService asyncEventService;
 
@@ -53,13 +54,15 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
                                final MetricsFactory metricsFactory, final IndexService indexService,
                                final RxTaskScheduler rxTaskScheduler,
-                               final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                               final EventBuilder eventBuilder ) {
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
         this.indexService = indexService;
         this.rxTaskScheduler = rxTaskScheduler;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.eventBuilder = eventBuilder;
     }
 
 
@@ -82,8 +85,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
         switch ( impl ) {
             case LOCAL:
-                return new InMemoryAsyncEventService( indexService, rxTaskScheduler,
-                    entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously());
+                return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
             case SQS:
                 return new SQSAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
                     entityCollectionManagerFactory, rxTaskScheduler );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
new file mode 100644
index 0000000..f48451c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.List;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Interface for constructing an observable stream to perform asynchonous events
+ */
+public interface EventBuilder {
+    /**
+     * Return the cold observable of entity index update operations
+     * @param applicationScope
+     * @param entity
+     * @return
+     */
+    Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+
+    /**
+     * Return the cold observable of the new edge operation
+     * @param applicationScope
+     * @param entity
+     * @param newEdge
+     * @return
+     */
+    Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+
+    /**
+     * Return the cold observable of the deleted edge operations
+     * @param applicationScope
+     * @param edge
+     * @return
+     */
+    Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+
+    /**
+     * Return a ben with 2 obervable streams for entity delete.
+     * @param applicationScope
+     * @param entityId
+     * @return
+     */
+    EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+
+    /**
+     * Re-index an entity in the scope provided
+     * @param entityIdScope
+     * @return
+     */
+    Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+
+    /**
+     * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that
+     * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesCompacted is subscribed
+     */
+    final class EntityDeleteResults {
+        private final Observable<IndexOperationMessage> indexOperationMessageObservable;
+        private final Observable<List<MvccLogEntry>> entitiesCompacted;
+
+
+        public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
+                                    final Observable<List<MvccLogEntry>> entitiesCompacted ) {
+            this.indexOperationMessageObservable = indexOperationMessageObservable;
+            this.entitiesCompacted = entitiesCompacted;
+        }
+
+
+        public Observable<IndexOperationMessage> getIndexObservable() {
+            return indexOperationMessageObservable;
+        }
+
+
+        public Observable<List<MvccLogEntry>> getEntitiesCompacted() {
+            return entitiesCompacted;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
new file mode 100644
index 0000000..c0d82d2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Service that executes event flows
+ */
+@Singleton
+public class EventBuilderImpl implements EventBuilder {
+
+    private static final Logger log = LoggerFactory.getLogger( EventBuilderImpl.class );
+
+    private final IndexService indexService;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+    private final SerializationFig serializationFig;
+
+
+    @Inject
+    public EventBuilderImpl( final IndexService indexService,
+                             final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                             final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig ) {
+        this.indexService = indexService;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+        this.serializationFig = serializationFig;
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+                                                                     final Entity entity ) {
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+
+
+        log.debug( "Indexing  in app scope {} entity {}", entity, applicationScope );
+
+        final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity );
+
+        return edgeObservable;
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+                                                           final Edge newEdge ) {
+
+        log.debug( "Indexing  in app scope {} with entity {} and new edge {}",
+            new Object[] { entity, applicationScope, newEdge } );
+
+        final Observable<IndexOperationMessage> edgeObservable =
+            indexService.indexEdge( applicationScope, entity, newEdge );
+
+        return edgeObservable;
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
+                                                              final Edge edge ) {
+        log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
+
+        final Observable<IndexOperationMessage> edgeObservable =
+            indexService.deleteIndexEdge( applicationScope, edge ).doOnCompleted( () -> {
+                final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+                gm.deleteEdge( edge );
+            } );
+
+        return edgeObservable;
+    }
+
+
+    @Override
+    public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+        log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+
+
+        //observable of index operation messages
+        final Observable<IndexOperationMessage> edgeObservable =
+            indexService.deleteEntityIndexes( applicationScope, entityId );
+
+
+        //observable of entries as the batches are deleted
+        final Observable<List<MvccLogEntry>> entries =
+            ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
+               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+
+
+        return new EntityDeleteResults( edgeObservable, entries );
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+
+        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+
+        final Id entityId = entityIdScope.getId();
+
+        //load the entity
+        return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+            //perform indexing on the task scheduler and start it
+            .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 8d0e8c3..6faa695 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,13 +23,10 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.index.IndexService;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -39,24 +36,27 @@ import com.google.inject.Singleton;
 import rx.Observable;
 
 
+/**
+ * TODO refactor this implementation into another class. The AsyncEventService impl will then invoke this class
+ *
+ * Performs in memory asynchronous execution using a task scheduler to limit throughput via RX.
+ */
 @Singleton
 public class InMemoryAsyncEventService implements AsyncEventService {
 
     private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncEventService.class );
 
-    private final IndexService indexService;
+    private final EventBuilder eventBuilder;
     private final RxTaskScheduler rxTaskScheduler;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final boolean resolveSynchronously;
 
 
+
     @Inject
-    public InMemoryAsyncEventService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler,
-                                      final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                      boolean resolveSynchronously ) {
-        this.indexService = indexService;
+    public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean
+        resolveSynchronously ) {
+        this.eventBuilder = eventBuilder;
         this.rxTaskScheduler = rxTaskScheduler;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.resolveSynchronously = resolveSynchronously;
     }
 
@@ -68,62 +68,39 @@ public class InMemoryAsyncEventService implements AsyncEventService {
         //only process the same version, otherwise ignore
 
 
-        log.debug( "Indexing  in app scope {} entity {}", entity, applicationScope );
-
-        final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity );
-
-
-        run( edgeObservable );
+        run( eventBuilder.queueEntityIndexUpdate( applicationScope, entity ) );
     }
 
 
     @Override
     public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
-
-        log.debug( "Indexing  in app scope {} with entity {} and new edge {}",
-            new Object[] { entity, applicationScope, newEdge } );
-
-        final Observable<IndexOperationMessage> edgeObservable =  indexService.indexEdge( applicationScope, entity, newEdge );
-
-        run( edgeObservable );
+        run( eventBuilder.queueNewEdge( applicationScope, entity, newEdge ) );
     }
 
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
-        log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
 
-        final Observable<IndexOperationMessage> edgeObservable = indexService.deleteIndexEdge( applicationScope, edge );
-
-        run( edgeObservable );
+        run( eventBuilder.queueDeleteEdge( applicationScope, edge ) );
     }
 
 
     @Override
     public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
-        log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
-
-        final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteEntityIndexes( applicationScope, entityId );
 
-        //TODO chain graph operations here
+        final EventBuilderImpl.EntityDeleteResults results =
+            eventBuilder.queueEntityDelete( applicationScope, entityId );
 
-        run( edgeObservable );
+        run( results.getIndexObservable() );
+        run( results.getEntitiesCompacted() );
     }
 
 
     @Override
     public void index( final EntityIdScope entityIdScope ) {
 
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
-
-        final Id entityId = entityIdScope.getId();
 
-        //load the entity
-        entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
-            //perform indexing on the task scheduler and start it
-            .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) )
-            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+        run(eventBuilder.index( entityIdScope ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
index 2860c89..77d7cab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -25,6 +25,7 @@ import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService;
 import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -46,7 +47,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
 
 
     @Inject
-    public IndexService indexService;
+    public EventBuilder eventBuilder;
 
     @Inject
     public RxTaskScheduler rxTaskScheduler;
@@ -54,7 +55,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new InMemoryAsyncEventService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false  );
+        return  new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, false  );
     }