You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/04 20:30:29 UTC
incubator-usergrid git commit: Updates amazon async service wiring to
wire to event builder
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-900 [created] d67c220dd
Updates amazon async service wiring to wire to event builder
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d67c220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d67c220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d67c220d
Branch: refs/heads/USERGRID-900
Commit: d67c220dd7221131a5c7bbcfebcd60f78cddffb1
Parents: b9d2d35
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 12:30:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 12:30:27 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 132 +++++++++----------
.../asyncevents/AsyncIndexProvider.java | 11 +-
.../asyncevents/EventBuilder.java | 11 +-
.../asyncevents/EventBuilderImpl.java | 12 +-
.../asyncevents/InMemoryAsyncEventService.java | 14 +-
.../asyncevents/model/EntityIndexEvent.java | 16 ++-
.../index/AmazonAsyncEventServiceTest.java | 7 +-
7 files changed, 107 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index ec43ab7..0429af3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -27,14 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;
-import org.apache.usergrid.corepersistence.CpEntityManager;
+
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.utils.UUIDUtils;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,15 +79,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final QueueManager queue;
private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
- private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
+ private final EventBuilder eventBuilder;
+ private final RxTaskScheduler rxTaskScheduler;
private final Timer readTimer;
private final Timer writeTimer;
private final Timer ackTimer;
+ /**
+ * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+ */
private final Object mutex = new Object();
private final Counter indexErrorCounter;
@@ -99,19 +104,17 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Inject
- public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
- final IndexProcessorFig indexProcessorFig,
- final MetricsFactory metricsFactory,
- final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory
- ) {
-
- this.indexService = indexService;
+ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig,
+ final MetricsFactory metricsFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory,
+ final EventBuilder eventBuilder,
+ final RxTaskScheduler rxTaskScheduler ) {
+
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
+ this.eventBuilder = eventBuilder;
+ this.rxTaskScheduler = rxTaskScheduler;
this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -144,7 +147,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
try {
//signal to SQS
- this.queue.sendMessage(operation);
+ this.queue.sendMessage( operation );
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -157,7 +160,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
try {
//signal to SQS
- this.queue.sendMessages(operations);
+ this.queue.sendMessages( operations );
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -189,33 +192,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
/**
* Ack message in SQS
*/
- public void ack(final List<QueueMessage> messages) {
-
- final Timer.Context timer = this.ackTimer.time();
-
- try{
- // no op
- if (messages.size() == 0) {
- return;
- }
- queue.commitMessages(messages);
-
- //decrement our in-flight counter
- inFlight.addAndGet(-1 * messages.size());
-
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
- }
- finally {
- timer.stop();
- }
-
-
- }
-
- /**
- * Ack message in SQS
- */
public void ack(final QueueMessage message) {
final Timer.Context timer = this.ackTimer.time();
@@ -290,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
final Entity entity) {
- offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId())));
+ offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0));
}
@@ -298,7 +274,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
- final AsyncEvent event = (AsyncEvent) message.getBody();
+ final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
@@ -307,13 +283,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
//only process the same version, otherwise ignore
final EntityIdScope entityIdScope = event.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+ final Id entityId = entityIdScope.getId();
+ final long updatedAfter = event.getUpdatedAfter();
+
+ final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+ final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
- ecm.load(entityIdScope.getId())
- .first()
- .flatMap(entity -> indexService.indexEntity(applicationScope, entity))
- .doOnNext(ignore -> ack(message)).subscribe();
+ subscibeAndAck( observable, message );
}
@@ -324,7 +301,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);
- offer(operation);
+ offer( operation );
}
public void handleEdgeIndex(final QueueMessage message) {
@@ -339,18 +316,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
final ApplicationScope applicationScope = event.getApplicationScope();
final Edge edge = event.getEdge();
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
- ecm.load(event.getEntityId())
- .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge))
- .doOnNext(ignore -> ack(message)).subscribe();
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+ applicationScope, entity, edge ) );
+
+ subscibeAndAck( edgeIndexObservable, message );
}
@Override
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {
- offer(new EdgeDeleteEvent(applicationScope, edge));
+ offer( new EdgeDeleteEvent( applicationScope, edge ) );
}
public void handleEdgeDelete(final QueueMessage message) {
@@ -367,15 +347,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
- indexService.deleteIndexEdge(applicationScope, edge)
- .doOnNext(ignore -> ack(message)).subscribe();
+ final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
+
+ subscibeAndAck( observable, message );
}
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId)));
+ offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
}
public void handleEntityDelete(final QueueMessage message) {
@@ -384,7 +365,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType()));
+ Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
+ String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
final ApplicationScope applicationScope = event.getApplicationScope();
final Id entityId = event.getEntityId();
@@ -392,10 +374,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (logger.isDebugEnabled())
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
- ack(message);
+ ack( message );
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+ final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
+ entityDeleteResults.getIndexObservable() );
- indexService.deleteEntityIndexes(applicationScope, entityId, UUIDUtils.maxTimeUUID(Long.MAX_VALUE))
- .doOnNext(ignore -> ack(message)).subscribe();
+ subscibeAndAck( merged, message );
}
@@ -407,9 +395,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
- final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+ final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
- ack(message);
+ ack( message );
}
/**
@@ -494,7 +482,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
//change to id scope to avoid serialization issues
- offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
+ offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id), updatedSince));
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -502,8 +490,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
List batch = new ArrayList<EdgeScope>();
for ( EdgeScope e : edges){
//change to id scope to avoid serialization issues
- batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+ batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
}
- offerBatch(batch);
+ offerBatch( batch );
+ }
+
+
+ /**
+ * Subscribes to the observable and acks the message via SQS on completion
+ * @param observable
+ * @param message
+ */
+ private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){
+ observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0a58369..0e773cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -44,7 +44,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final QueueManagerFactory queueManagerFactory;
private final MetricsFactory metricsFactory;
- private final IndexService indexService;
private final RxTaskScheduler rxTaskScheduler;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EventBuilder eventBuilder;
@@ -58,7 +57,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
final QueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory,
- final IndexService indexService,
final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EventBuilder eventBuilder,
@@ -68,7 +66,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
this.metricsFactory = metricsFactory;
- this.indexService = indexService;
this.rxTaskScheduler = rxTaskScheduler;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.eventBuilder = eventBuilder;
@@ -97,11 +94,11 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
case SQS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+ return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
case SNS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+ return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f9f157e..d246e2f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -37,13 +37,14 @@ import rx.Observable;
* Interface for constructing an observable stream to perform asynchonous events
*/
public interface EventBuilder {
+
/**
* Return the cold observable of entity index update operations
* @param applicationScope
* @param entity
* @return
*/
- Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+ Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
/**
* Return the cold observable of the new edge operation
@@ -52,7 +53,7 @@ public interface EventBuilder {
* @param newEdge
* @return
*/
- Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+ Observable<IndexOperationMessage> buildNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
/**
* Return the cold observable of the deleted edge operations
@@ -60,7 +61,7 @@ public interface EventBuilder {
* @param edge
* @return
*/
- Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+ Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
/**
* Return a ben with 2 obervable streams for entity delete.
@@ -68,14 +69,14 @@ public interface EventBuilder {
* @param entityId
* @return
*/
- EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+ EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
/**
* Re-index an entity in the scope provided
* @param entityIndexOperation
* @return
*/
- Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
+ Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation );
/**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index e4cd4b5..46cec2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,7 +73,7 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+ public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
final Entity entity ) {
//process the entity immediately
@@ -89,7 +89,7 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+ public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
final Edge newEdge ) {
log.debug( "Indexing in app scope {} with entity {} and new edge {}",
@@ -103,8 +103,8 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
- final Edge edge ) {
+ public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+ edge ) {
log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
final Observable<IndexOperationMessage> edgeObservable =
@@ -121,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder {
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
@Override
- public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+ public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
@@ -163,7 +163,7 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
+ public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index adb4a90..830033d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,8 +20,6 @@
package org.apache.usergrid.corepersistence.asyncevents;
-import com.amazonaws.services.opsworks.model.App;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,19 +75,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
//only process the same version, otherwise ignore
- run( eventBuilder.queueEntityIndexUpdate(applicationScope, entity) );
+ run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) );
}
@Override
public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
- run( eventBuilder.queueNewEdge(applicationScope, entity, newEdge) );
+ run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) );
}
@Override
public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
- run( eventBuilder.queueDeleteEdge(applicationScope, edge) );
+ run( eventBuilder.buildDeleteEdge( applicationScope, edge ) );
}
@@ -97,7 +95,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
final EventBuilderImpl.EntityDeleteResults results =
- eventBuilder.queueEntityDelete( applicationScope, entityId );
+ eventBuilder.buildEntityDelete( applicationScope, entityId );
run( results.getIndexObservable() );
run( results.getEntitiesCompacted() );
@@ -107,7 +105,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
- run(eventBuilder.index( entityIndexOperation ));
+ run(eventBuilder.buildEntityIndex( entityIndexOperation ));
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -115,7 +113,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
e.getEdge().getTargetNode(), updatedSince);
- run(eventBuilder.index (entityIndexOperation));
+ run(eventBuilder.buildEntityIndex( entityIndexOperation ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 7b79987..81961a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -27,10 +27,24 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
*/
@JsonDeserialize(as = AsyncEvent.class)
public final class EntityIndexEvent extends AsyncEvent {
+
+ private long updatedAfter;
+
public EntityIndexEvent() {
}
- public EntityIndexEvent(EntityIdScope entityIdScope) {
+ public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
super(EventType.ENTITY_INDEX, entityIdScope);
+ this.updatedAfter = updatedAfter;
+ }
+
+
+ public long getUpdatedAfter() {
+ return updatedAfter;
+ }
+
+
+ public void setUpdatedAfter( long updatedAfter ) {
+ this.updatedAfter = updatedAfter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 9cf896c..d37701b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -71,6 +72,9 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Inject
public RxTaskScheduler rxTaskScheduler;
+ @Inject
+ public EventBuilder eventBuilder;
+
@Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -81,8 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
}