You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/11 03:03:09 UTC
incubator-usergrid git commit: Refactor of InMemoryAsyncEventService
into EventBuilderImpl. All asyc scheduling mechanisms,
will need to invoke this central logic flow
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 2e5937b03 -> 3f64b29a4
Refactor of InMemoryAsyncEventService into EventBuilderImpl. All asyc scheduling mechanisms, will need to invoke this central logic flow
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f64b29a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f64b29a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f64b29a
Branch: refs/heads/two-dot-o-dev
Commit: 3f64b29a44a7e192b5866ce35565b91ea6a838f8
Parents: 2e5937b
Author: Todd Nine <tn...@apigee.com>
Authored: Sun May 10 19:02:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 19:02:57 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 9 +-
.../asyncevents/AsyncIndexProvider.java | 8 +-
.../asyncevents/EventBuilder.java | 105 +++++++++++++
.../asyncevents/EventBuilderImpl.java | 154 +++++++++++++++++++
.../asyncevents/InMemoryAsyncEventService.java | 59 +++----
.../index/InMemoryAsycIndexServiceTest.java | 5 +-
6 files changed, 293 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index f1e1596..a02bffd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -20,6 +20,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
@@ -132,9 +134,14 @@ public class CoreModule extends AbstractModule {
bind( IndexService.class ).to( IndexServiceImpl.class );
+
+ //bind the event handlers
+ bind( EventBuilder.class).to( EventBuilderImpl.class );
+
//bind the queue provider
+ bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
+
- bind( AsyncEventService.class).toProvider( AsyncIndexProvider.class );
install( new GuicyFigModule( IndexProcessorFig.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 9f801b4..ec968af 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -45,6 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final IndexService indexService;
private final RxTaskScheduler rxTaskScheduler;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EventBuilder eventBuilder;
private AsyncEventService asyncEventService;
@@ -53,13 +54,15 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory, final IndexService indexService,
final RxTaskScheduler rxTaskScheduler,
- final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EventBuilder eventBuilder ) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
this.metricsFactory = metricsFactory;
this.indexService = indexService;
this.rxTaskScheduler = rxTaskScheduler;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.eventBuilder = eventBuilder;
}
@@ -82,8 +85,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
switch ( impl ) {
case LOCAL:
- return new InMemoryAsyncEventService( indexService, rxTaskScheduler,
- entityCollectionManagerFactory, indexProcessorFig.resolveSynchronously());
+ return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
case SQS:
return new SQSAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
entityCollectionManagerFactory, rxTaskScheduler );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
new file mode 100644
index 0000000..f48451c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.List;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+
+
+/**
+ * Interface for constructing an observable stream to perform asynchonous events
+ */
+public interface EventBuilder {
+ /**
+ * Return the cold observable of entity index update operations
+ * @param applicationScope
+ * @param entity
+ * @return
+ */
+ Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+
+ /**
+ * Return the cold observable of the new edge operation
+ * @param applicationScope
+ * @param entity
+ * @param newEdge
+ * @return
+ */
+ Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+
+ /**
+ * Return the cold observable of the deleted edge operations
+ * @param applicationScope
+ * @param edge
+ * @return
+ */
+ Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+
+ /**
+ * Return a ben with 2 obervable streams for entity delete.
+ * @param applicationScope
+ * @param entityId
+ * @return
+ */
+ EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+
+ /**
+ * Re-index an entity in the scope provided
+ * @param entityIdScope
+ * @return
+ */
+ Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+
+ /**
+ * A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
+ * indexOperationMessages should be subscribed and completed BEFORE the getEntitiesCompacted is subscribed
+ */
+ final class EntityDeleteResults {
+ private final Observable<IndexOperationMessage> indexOperationMessageObservable;
+ private final Observable<List<MvccLogEntry>> entitiesCompacted;
+
+
+ public EntityDeleteResults( final Observable<IndexOperationMessage> indexOperationMessageObservable,
+ final Observable<List<MvccLogEntry>> entitiesCompacted ) {
+ this.indexOperationMessageObservable = indexOperationMessageObservable;
+ this.entitiesCompacted = entitiesCompacted;
+ }
+
+
+ public Observable<IndexOperationMessage> getIndexObservable() {
+ return indexOperationMessageObservable;
+ }
+
+
+ public Observable<List<MvccLogEntry>> getEntitiesCompacted() {
+ return entitiesCompacted;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
new file mode 100644
index 0000000..c0d82d2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+
+
+/**
+ * Service that executes event flows
+ */
+@Singleton
+public class EventBuilderImpl implements EventBuilder {
+
+ private static final Logger log = LoggerFactory.getLogger( EventBuilderImpl.class );
+
+ private final IndexService indexService;
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
+ private final SerializationFig serializationFig;
+
+
+ @Inject
+ public EventBuilderImpl( final IndexService indexService,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final GraphManagerFactory graphManagerFactory, final SerializationFig serializationFig ) {
+ this.indexService = indexService;
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
+ this.serializationFig = serializationFig;
+ }
+
+
+ @Override
+ public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+ final Entity entity ) {
+
+ //process the entity immediately
+ //only process the same version, otherwise ignore
+
+
+ log.debug( "Indexing in app scope {} entity {}", entity, applicationScope );
+
+ final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity );
+
+ return edgeObservable;
+ }
+
+
+ @Override
+ public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+ final Edge newEdge ) {
+
+ log.debug( "Indexing in app scope {} with entity {} and new edge {}",
+ new Object[] { entity, applicationScope, newEdge } );
+
+ final Observable<IndexOperationMessage> edgeObservable =
+ indexService.indexEdge( applicationScope, entity, newEdge );
+
+ return edgeObservable;
+ }
+
+
+ @Override
+ public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
+ final Edge edge ) {
+ log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
+
+ final Observable<IndexOperationMessage> edgeObservable =
+ indexService.deleteIndexEdge( applicationScope, edge ).doOnCompleted( () -> {
+ final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+ gm.deleteEdge( edge );
+ } );
+
+ return edgeObservable;
+ }
+
+
+ @Override
+ public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+ log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
+
+
+ //observable of index operation messages
+ final Observable<IndexOperationMessage> edgeObservable =
+ indexService.deleteEntityIndexes( applicationScope, entityId );
+
+
+ //observable of entries as the batches are deleted
+ final Observable<List<MvccLogEntry>> entries =
+ ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
+ .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+
+
+ return new EntityDeleteResults( edgeObservable, entries );
+ }
+
+
+ @Override
+ public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+
+ final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+
+ final Id entityId = entityIdScope.getId();
+
+ //load the entity
+ return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+ //perform indexing on the task scheduler and start it
+ .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 8d0e8c3..6faa695 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,13 +23,10 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.index.IndexService;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -39,24 +36,27 @@ import com.google.inject.Singleton;
import rx.Observable;
+/**
+ * TODO refactor this implementation into another class. The AsyncEventService impl will then invoke this class
+ *
+ * Performs in memory asynchronous execution using a task scheduler to limit throughput via RX.
+ */
@Singleton
public class InMemoryAsyncEventService implements AsyncEventService {
private static final Logger log = LoggerFactory.getLogger( InMemoryAsyncEventService.class );
- private final IndexService indexService;
+ private final EventBuilder eventBuilder;
private final RxTaskScheduler rxTaskScheduler;
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final boolean resolveSynchronously;
+
@Inject
- public InMemoryAsyncEventService( final IndexService indexService, final RxTaskScheduler rxTaskScheduler,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- boolean resolveSynchronously ) {
- this.indexService = indexService;
+ public InMemoryAsyncEventService( final EventBuilder eventBuilder, final RxTaskScheduler rxTaskScheduler, boolean
+ resolveSynchronously ) {
+ this.eventBuilder = eventBuilder;
this.rxTaskScheduler = rxTaskScheduler;
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.resolveSynchronously = resolveSynchronously;
}
@@ -68,62 +68,39 @@ public class InMemoryAsyncEventService implements AsyncEventService {
//only process the same version, otherwise ignore
- log.debug( "Indexing in app scope {} entity {}", entity, applicationScope );
-
- final Observable<IndexOperationMessage> edgeObservable = indexService.indexEntity( applicationScope, entity );
-
-
- run( edgeObservable );
+ run( eventBuilder.queueEntityIndexUpdate( applicationScope, entity ) );
}
@Override
public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
-
- log.debug( "Indexing in app scope {} with entity {} and new edge {}",
- new Object[] { entity, applicationScope, newEdge } );
-
- final Observable<IndexOperationMessage> edgeObservable = indexService.indexEdge( applicationScope, entity, newEdge );
-
- run( edgeObservable );
+ run( eventBuilder.queueNewEdge( applicationScope, entity, newEdge ) );
}
@Override
public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
- log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
- final Observable<IndexOperationMessage> edgeObservable = indexService.deleteIndexEdge( applicationScope, edge );
-
- run( edgeObservable );
+ run( eventBuilder.queueDeleteEdge( applicationScope, edge ) );
}
@Override
public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
- log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
-
- final Observable<IndexOperationMessage> edgeObservable =
- indexService.deleteEntityIndexes( applicationScope, entityId );
- //TODO chain graph operations here
+ final EventBuilderImpl.EntityDeleteResults results =
+ eventBuilder.queueEntityDelete( applicationScope, entityId );
- run( edgeObservable );
+ run( results.getIndexObservable() );
+ run( results.getEntitiesCompacted() );
}
@Override
public void index( final EntityIdScope entityIdScope ) {
- final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
-
- final Id entityId = entityIdScope.getId();
- //load the entity
- entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
- //perform indexing on the task scheduler and start it
- .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) )
- .subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
+ run(eventBuilder.index( entityIdScope ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f64b29a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
index 2860c89..77d7cab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/InMemoryAsycIndexServiceTest.java
@@ -25,6 +25,7 @@ import org.junit.runner.RunWith;
import org.apache.usergrid.corepersistence.TestIndexModule;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.corepersistence.asyncevents.InMemoryAsyncEventService;
import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
@@ -46,7 +47,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
@Inject
- public IndexService indexService;
+ public EventBuilder eventBuilder;
@Inject
public RxTaskScheduler rxTaskScheduler;
@@ -54,7 +55,7 @@ public class InMemoryAsycIndexServiceTest extends AsyncIndexServiceTest {
@Override
protected AsyncEventService getAsyncEventService() {
- return new InMemoryAsyncEventService( indexService, rxTaskScheduler, entityCollectionManagerFactory,false );
+ return new InMemoryAsyncEventService( eventBuilder, rxTaskScheduler, false );
}