You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/11 19:37:25 UTC

[11/14] incubator-usergrid git commit: Refactor of InMemoryAsyncEventService into EventBuilderImpl. All asyc scheduling mechanisms, will need to invoke this central logic flow

Refactor of InMemoryAsyncEventService into EventBuilderImpl.  All asyc scheduling mechanisms, will need to invoke this central logic flow


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f64b29a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f64b29a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f64b29a

Branch: refs/heads/USERGRID-641
Commit: 3f64b29a44a7e192b5866ce35565b91ea6a838f8
Parents: 2e5937b
Author: Todd Nine <tn...@apigee.com>
Authored: Sun May 10 19:02:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 19:02:57 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   9 +-
 .../asyncevents/AsyncIndexProvider.java         |   8 +-
 .../asyncevents/EventBuilder.java               | 105 +++++++++++++
 .../asyncevents/EventBuilderImpl.java           | 154 +++++++++++++++++++
 .../asyncevents/InMemoryAsyncEventService.java  |  59 +++----
 .../index/InMemoryAsycIndexServiceTest.java     |   5 +-
 6 files changed, 293 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index f1e1596..a02bffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -20,6 +20,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
@@ -132,9 +134,14 @@ public class CoreModule  extends AbstractModule {
 
 
         bind( IndexService.class ).to( IndexServiceImpl.class );
+
+        //bind the event handlers
+        bind( EventBuilder.class).to( EventBuilderImpl.class );
+
         //bind the queue provider
+        bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
+
 
-        bind( AsyncEventService.class).toProvider( AsyncIndexProvider.class );
 
         install( new GuicyFigModule( IndexProcessorFig.class ) );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 9f801b4..ec968af 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -45,6 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final IndexService indexService;
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final EventBuilder eventBuilder;
 
     private AsyncEventService asyncEventService;
 
@@ -53,13 +54,15 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
                                final MetricsFactory metricsFactory, final IndexService indexService,
                                final RxTaskScheduler rxTaskScheduler,
-                               final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                               final EventBuilder eventBuilder ) {
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
         this.indexService = indexService;
         this.rxTaskScheduler = rxTaskScheduler;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.eventBuilder = eventBuilder;
     }
 
 
@@ -82,8 +85,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
         switch ( impl ) {
             case LOCAL:
-                return new InMemoryAsyncEventService( indexService, rxTaskScheduler,
-                    entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously());
+                return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
             case SQS:
                 return new SQSAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
                     entityCollectionManagerFactory, rxTaskScheduler );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
new file mode 100644
index 0000000..f48451c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.List;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Interface for constructing an observable stream to perform asynchonous events
+ */
+public interface EventBuilder {
+    /**
+     * Return the cold observable of entity index update operations
+     * @param applicationScope
+     * @param entity
+     * @return
+     */
+    Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+
+    /**
+     * Return the cold observable of the new edge operation
+     * @param applicationScope
+     * @param entity
+     * @param newEdge
+     * @return
+     */
+    Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+
+    /**
+     * Return the cold observable of the deleted edge operations
+     * @param applicationScope
+     * @param edge
+     * @return
+     */
+    Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+
+    /**
+     * Return a ben with 2 obervable streams for entity delete.
+     * @param applicationScope
+     * @param entityId
+     * @return
+     */
+    EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+
+    /**
+     * Re-index an entity in the scope provided
+     * @param entityIdScope
+     * @return
+     */
+    Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+
+    /**
+     * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that
+     * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesCompacted is subscribed
+     */
+    final class EntityDeleteResults {
+        private final Observable<IndexOperationMessage> indexOperationMessageObservable;
+        private final Observable<List<MvccLogEntry>> entitiesCompacted;
+
+
+        public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
+                                    final Observable<List<MvccLogEntry>> entitiesCompacted ) {
+            this.indexOperationMessageObservable = indexOperationMessageObservable;
+            this.entitiesCompacted = entitiesCompacted;
+        }
+
+
+        public Observable<IndexOperationMessage> getIndexObservable() {
+            return indexOperationMessageObservable;
+        }
+
+
+        public Observable<List<MvccLogEntry>> getEntitiesCompacted() {
+            return entitiesCompacted;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
new file mode 100644
index 0000000..c0d82d2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Service that executes event flows
+ */
+@Singleton
+public class EventBuilderImpl implements EventBuilder {
+
+    private static final Logger log = LoggerFactory.getLogger( EventBuilderImpl.class );
+
+    private final IndexService indexService;
+    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final GraphManagerFactory graphManagerFactory;
+    private final SerializationFig serializationFig;
+
+
+    @Inject
+    public EventBuilderImpl( final IndexService indexService,
+                             final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                             final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig ) {
+        this.indexService = indexService;
+        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.graphManagerFactory = graphManagerFactory;
+        this.serializationFig = serializationFig;
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+                                                                     final Entity entity ) {
+
+        //process the entity immediately
+        //only process the same version, otherwise ignore
+
+
+        log.debug( "Indexing  in app scope {} entity {}", entity, applicationScope );
+
+        final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity );
+
+        return edgeObservable;
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+                                                           final Edge newEdge ) {
+
+        log.debug( "Indexing  in app scope {} with entity {} and new edge {}",
+            new Object[] { entity, applicationScope, newEdge } );
+
+        final Observable<IndexOperationMessage> edgeObservable =
+            indexService.indexEdge( applicationScope, entity, newEdge );
+
+        return edgeObservable;
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
+                                                              final Edge edge ) {
+        log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
+
+        final Observable<IndexOperationMessage> edgeObservable =
+            indexService.deleteIndexEdge( applicationScope, edge ).doOnCompleted( () -> {
+                final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+                gm.deleteEdge( edge );
+            } );
+
+        return edgeObservable;
+    }
+
+
+    @Override
+    public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+        log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+
+
+        //observable of index operation messages
+        final Observable<IndexOperationMessage> edgeObservable =
+            indexService.deleteEntityIndexes( applicationScope, entityId );
+
+
+        //observable of entries as the batches are deleted
+        final Observable<List<MvccLogEntry>> entries =
+            ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
+               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+
+
+        return new EntityDeleteResults( edgeObservable, entries );
+    }
+
+
+    @Override
+    public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+
+        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+
+        final Id entityId = entityIdScope.getId();
+
+        //load the entity
+        return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+            //perform indexing on the task scheduler and start it
+            .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 8d0e8c3..6faa695 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,13 +23,10 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.index.IndexService;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -39,24 +36,27 @@ import com.google.inject.Singleton;
 import rx.Observable;
 
 
+/**
+ * TODO refactor this implementation into another class. The AsyncEventService impl will then invoke this class
+ *
+ * Performs in memory asynchronous execution using a task scheduler to limit throughput via RX.
+ */
 @Singleton
 public class InMemoryAsyncEventService implements AsyncEventService {
 
     private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncEventService.class );
 
-    private final IndexService indexService;
+    private final EventBuilder eventBuilder;
     private final RxTaskScheduler rxTaskScheduler;
-    private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final boolean resolveSynchronously;
 
 
+
     @Inject
-    public InMemoryAsyncEventService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler,
-                                      final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                      boolean resolveSynchronously ) {
-        this.indexService = indexService;
+    public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean
+        resolveSynchronously ) {
+        this.eventBuilder = eventBuilder;
         this.rxTaskScheduler = rxTaskScheduler;
-        this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.resolveSynchronously = resolveSynchronously;
     }
 
@@ -68,62 +68,39 @@ public class InMemoryAsyncEventService implements AsyncEventService {
         //only process the same version, otherwise ignore
 
 
-        log.debug( "Indexing  in app scope {} entity {}", entity, applicationScope );
-
-        final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity );
-
-
-        run( edgeObservable );
+        run( eventBuilder.queueEntityIndexUpdate( applicationScope, entity ) );
     }
 
 
     @Override
     public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
-
-        log.debug( "Indexing  in app scope {} with entity {} and new edge {}",
-            new Object[] { entity, applicationScope, newEdge } );
-
-        final Observable<IndexOperationMessage> edgeObservable =  indexService.indexEdge( applicationScope, entity, newEdge );
-
-        run( edgeObservable );
+        run( eventBuilder.queueNewEdge( applicationScope, entity, newEdge ) );
     }
 
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
-        log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
 
-        final Observable<IndexOperationMessage> edgeObservable = indexService.deleteIndexEdge( applicationScope, edge );
-
-        run( edgeObservable );
+        run( eventBuilder.queueDeleteEdge( applicationScope, edge ) );
     }
 
 
     @Override
     public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
-        log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
-
-        final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteEntityIndexes( applicationScope, entityId );
 
-        //TODO chain graph operations here
+        final EventBuilderImpl.EntityDeleteResults results =
+            eventBuilder.queueEntityDelete( applicationScope, entityId );
 
-        run( edgeObservable );
+        run( results.getIndexObservable() );
+        run( results.getEntitiesCompacted() );
     }
 
 
     @Override
     public void index( final EntityIdScope entityIdScope ) {
 
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
-
-        final Id entityId = entityIdScope.getId();
 
-        //load the entity
-        entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
-            //perform indexing on the task scheduler and start it
-            .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) )
-            .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+        run(eventBuilder.index( entityIdScope ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
index 2860c89..77d7cab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -25,6 +25,7 @@ import org.junit.runner.RunWith;
 
 import org.apache.usergrid.corepersistence.TestIndexModule;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService;
 import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -46,7 +47,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
 
 
     @Inject
-    public IndexService indexService;
+    public EventBuilder eventBuilder;
 
     @Inject
     public RxTaskScheduler rxTaskScheduler;
@@ -54,7 +55,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new InMemoryAsyncEventService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false  );
+        return  new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, false  );
     }