You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/08/05 01:34:30 UTC
[1/5] incubator-usergrid git commit: Updates amazon async service
wiring to wire to event builder
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev ba5c49141 -> 9427b4ce4
Updates amazon async service wiring to wire to event builder
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d67c220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d67c220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d67c220d
Branch: refs/heads/two-dot-o-dev
Commit: d67c220dd7221131a5c7bbcfebcd60f78cddffb1
Parents: b9d2d35
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 12:30:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 12:30:27 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 132 +++++++++----------
.../asyncevents/AsyncIndexProvider.java | 11 +-
.../asyncevents/EventBuilder.java | 11 +-
.../asyncevents/EventBuilderImpl.java | 12 +-
.../asyncevents/InMemoryAsyncEventService.java | 14 +-
.../asyncevents/model/EntityIndexEvent.java | 16 ++-
.../index/AmazonAsyncEventServiceTest.java | 7 +-
7 files changed, 107 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index ec43ab7..0429af3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -27,14 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;
-import org.apache.usergrid.corepersistence.CpEntityManager;
+
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.utils.UUIDUtils;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,15 +79,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final QueueManager queue;
private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
- private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
+ private final EventBuilder eventBuilder;
+ private final RxTaskScheduler rxTaskScheduler;
private final Timer readTimer;
private final Timer writeTimer;
private final Timer ackTimer;
+ /**
+ * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+ */
private final Object mutex = new Object();
private final Counter indexErrorCounter;
@@ -99,19 +104,17 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Inject
- public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
- final IndexProcessorFig indexProcessorFig,
- final MetricsFactory metricsFactory,
- final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory
- ) {
-
- this.indexService = indexService;
+ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig,
+ final MetricsFactory metricsFactory, final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory,
+ final EventBuilder eventBuilder,
+ final RxTaskScheduler rxTaskScheduler ) {
+
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
+ this.eventBuilder = eventBuilder;
+ this.rxTaskScheduler = rxTaskScheduler;
this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -144,7 +147,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
try {
//signal to SQS
- this.queue.sendMessage(operation);
+ this.queue.sendMessage( operation );
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -157,7 +160,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
try {
//signal to SQS
- this.queue.sendMessages(operations);
+ this.queue.sendMessages( operations );
} catch (IOException e) {
throw new RuntimeException("Unable to queue message", e);
} finally {
@@ -189,33 +192,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
/**
* Ack message in SQS
*/
- public void ack(final List<QueueMessage> messages) {
-
- final Timer.Context timer = this.ackTimer.time();
-
- try{
- // no op
- if (messages.size() == 0) {
- return;
- }
- queue.commitMessages(messages);
-
- //decrement our in-flight counter
- inFlight.addAndGet(-1 * messages.size());
-
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
- }
- finally {
- timer.stop();
- }
-
-
- }
-
- /**
- * Ack message in SQS
- */
public void ack(final QueueMessage message) {
final Timer.Context timer = this.ackTimer.time();
@@ -290,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
final Entity entity) {
- offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId())));
+ offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0));
}
@@ -298,7 +274,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
- final AsyncEvent event = (AsyncEvent) message.getBody();
+ final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
@@ -307,13 +283,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
//only process the same version, otherwise ignore
final EntityIdScope entityIdScope = event.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+ final Id entityId = entityIdScope.getId();
+ final long updatedAfter = event.getUpdatedAfter();
+
+ final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+ final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
- ecm.load(entityIdScope.getId())
- .first()
- .flatMap(entity -> indexService.indexEntity(applicationScope, entity))
- .doOnNext(ignore -> ack(message)).subscribe();
+ subscibeAndAck( observable, message );
}
@@ -324,7 +301,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);
- offer(operation);
+ offer( operation );
}
public void handleEdgeIndex(final QueueMessage message) {
@@ -339,18 +316,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
final ApplicationScope applicationScope = event.getApplicationScope();
final Edge edge = event.getEdge();
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
- ecm.load(event.getEntityId())
- .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge))
- .doOnNext(ignore -> ack(message)).subscribe();
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+ applicationScope, entity, edge ) );
+
+ subscibeAndAck( edgeIndexObservable, message );
}
@Override
public void queueDeleteEdge(final ApplicationScope applicationScope,
final Edge edge) {
- offer(new EdgeDeleteEvent(applicationScope, edge));
+ offer( new EdgeDeleteEvent( applicationScope, edge ) );
}
public void handleEdgeDelete(final QueueMessage message) {
@@ -367,15 +347,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
- indexService.deleteIndexEdge(applicationScope, edge)
- .doOnNext(ignore -> ack(message)).subscribe();
+ final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
+
+ subscibeAndAck( observable, message );
}
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId)));
+ offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
}
public void handleEntityDelete(final QueueMessage message) {
@@ -384,7 +365,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
final AsyncEvent event = (AsyncEvent) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType()));
+ Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
+ String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
final ApplicationScope applicationScope = event.getApplicationScope();
final Id entityId = event.getEntityId();
@@ -392,10 +374,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (logger.isDebugEnabled())
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
- ack(message);
+ ack( message );
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+ final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
+ entityDeleteResults.getIndexObservable() );
- indexService.deleteEntityIndexes(applicationScope, entityId, UUIDUtils.maxTimeUUID(Long.MAX_VALUE))
- .doOnNext(ignore -> ack(message)).subscribe();
+ subscibeAndAck( merged, message );
}
@@ -407,9 +395,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
- final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+ final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
- ack(message);
+ ack( message );
}
/**
@@ -494,7 +482,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
//change to id scope to avoid serialization issues
- offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
+ offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id), updatedSince));
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -502,8 +490,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
List batch = new ArrayList<EdgeScope>();
for ( EdgeScope e : edges){
//change to id scope to avoid serialization issues
- batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+ batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
}
- offerBatch(batch);
+ offerBatch( batch );
+ }
+
+
+ /**
+ * Subscribes to the observable and acks the message via SQS on completion
+ * @param observable
+ * @param message
+ */
+ private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){
+ observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0a58369..0e773cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -44,7 +44,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final QueueManagerFactory queueManagerFactory;
private final MetricsFactory metricsFactory;
- private final IndexService indexService;
private final RxTaskScheduler rxTaskScheduler;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EventBuilder eventBuilder;
@@ -58,7 +57,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
final QueueManagerFactory queueManagerFactory,
final MetricsFactory metricsFactory,
- final IndexService indexService,
final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
final EventBuilder eventBuilder,
@@ -68,7 +66,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
this.metricsFactory = metricsFactory;
- this.indexService = indexService;
this.rxTaskScheduler = rxTaskScheduler;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.eventBuilder = eventBuilder;
@@ -97,11 +94,11 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
case SQS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+ return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
case SNS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+ return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f9f157e..d246e2f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -37,13 +37,14 @@ import rx.Observable;
* Interface for constructing an observable stream to perform asynchonous events
*/
public interface EventBuilder {
+
/**
* Return the cold observable of entity index update operations
* @param applicationScope
* @param entity
* @return
*/
- Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+ Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
/**
* Return the cold observable of the new edge operation
@@ -52,7 +53,7 @@ public interface EventBuilder {
* @param newEdge
* @return
*/
- Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+ Observable<IndexOperationMessage> buildNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
/**
* Return the cold observable of the deleted edge operations
@@ -60,7 +61,7 @@ public interface EventBuilder {
* @param edge
* @return
*/
- Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+ Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
/**
* Return a ben with 2 obervable streams for entity delete.
@@ -68,14 +69,14 @@ public interface EventBuilder {
* @param entityId
* @return
*/
- EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+ EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
/**
* Re-index an entity in the scope provided
* @param entityIndexOperation
* @return
*/
- Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
+ Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation );
/**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index e4cd4b5..46cec2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,7 +73,7 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+ public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
final Entity entity ) {
//process the entity immediately
@@ -89,7 +89,7 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+ public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
final Edge newEdge ) {
log.debug( "Indexing in app scope {} with entity {} and new edge {}",
@@ -103,8 +103,8 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
- final Edge edge ) {
+ public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+ edge ) {
log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
final Observable<IndexOperationMessage> edgeObservable =
@@ -121,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder {
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
@Override
- public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+ public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
@@ -163,7 +163,7 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
+ public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index adb4a90..830033d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,8 +20,6 @@
package org.apache.usergrid.corepersistence.asyncevents;
-import com.amazonaws.services.opsworks.model.App;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,19 +75,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
//only process the same version, otherwise ignore
- run( eventBuilder.queueEntityIndexUpdate(applicationScope, entity) );
+ run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) );
}
@Override
public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
- run( eventBuilder.queueNewEdge(applicationScope, entity, newEdge) );
+ run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) );
}
@Override
public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
- run( eventBuilder.queueDeleteEdge(applicationScope, edge) );
+ run( eventBuilder.buildDeleteEdge( applicationScope, edge ) );
}
@@ -97,7 +95,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
final EventBuilderImpl.EntityDeleteResults results =
- eventBuilder.queueEntityDelete( applicationScope, entityId );
+ eventBuilder.buildEntityDelete( applicationScope, entityId );
run( results.getIndexObservable() );
run( results.getEntitiesCompacted() );
@@ -107,7 +105,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
- run(eventBuilder.index( entityIndexOperation ));
+ run(eventBuilder.buildEntityIndex( entityIndexOperation ));
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -115,7 +113,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
e.getEdge().getTargetNode(), updatedSince);
- run(eventBuilder.index (entityIndexOperation));
+ run(eventBuilder.buildEntityIndex( entityIndexOperation ));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 7b79987..81961a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -27,10 +27,24 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
*/
@JsonDeserialize(as = AsyncEvent.class)
public final class EntityIndexEvent extends AsyncEvent {
+
+ private long updatedAfter;
+
public EntityIndexEvent() {
}
- public EntityIndexEvent(EntityIdScope entityIdScope) {
+ public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
super(EventType.ENTITY_INDEX, entityIdScope);
+ this.updatedAfter = updatedAfter;
+ }
+
+
+ public long getUpdatedAfter() {
+ return updatedAfter;
+ }
+
+
+ public void setUpdatedAfter( long updatedAfter ) {
+ this.updatedAfter = updatedAfter;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 9cf896c..d37701b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -71,6 +72,9 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Inject
public RxTaskScheduler rxTaskScheduler;
+ @Inject
+ public EventBuilder eventBuilder;
+
@Inject
public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -81,8 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
+ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
}
[2/5] incubator-usergrid git commit: Fixes runtime bug with joda time
conflict with Astayanx
Posted by gr...@apache.org.
Fixes runtime bug with joda time conflict with Astayanx
Fixes bug in AmazonUtils incorrectly re-throwing missing exception.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/29d115fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/29d115fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/29d115fc
Branch: refs/heads/two-dot-o-dev
Commit: 29d115fc88cc9a058735f3582bb73aee39ec16e7
Parents: d67c220
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 15:13:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 15:13:04 2015 -0600
----------------------------------------------------------------------
stack/corepersistence/queue/pom.xml | 20 +-
.../queue/util/AmazonNotificationUtils.java | 230 ++++++++++---------
2 files changed, 135 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 7780997..2d46dc8 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -54,13 +54,19 @@
<!-- tests -->
- <dependency>
- <groupId>org.apache.usergrid</groupId>
- <artifactId>common</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>org.apache.usergrid</groupId>
+ <artifactId>common</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.8.1</version>
+ </dependency>
<dependency>
<groupId>org.apache.usergrid</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 1d86823..9561a58 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,141 +1,150 @@
package org.apache.usergrid.persistence.queue.util;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.policy.*;
-import com.amazonaws.auth.policy.actions.SQSActions;
-import com.amazonaws.auth.policy.conditions.ConditionFactory;
-import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
-import com.amazonaws.services.sns.util.Topics;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.queue.QueueFig;
+
+import com.amazonaws.auth.policy.Condition;
+import com.amazonaws.auth.policy.Policy;
+import com.amazonaws.auth.policy.Principal;
+import com.amazonaws.auth.policy.Resource;
+import com.amazonaws.auth.policy.Statement;
+import com.amazonaws.auth.policy.actions.SQSActions;
+import com.amazonaws.auth.policy.conditions.ConditionFactory;
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.CreateTopicResult;
+import com.amazonaws.services.sns.model.ListTopicsResult;
+import com.amazonaws.services.sns.model.Topic;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
+
+
/**
* Created by Jeff West on 5/25/15.
*/
public class AmazonNotificationUtils {
- private static final Logger logger = LoggerFactory.getLogger(AmazonNotificationUtils.class);
+ private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class );
+
- public static String createQueue(final AmazonSQSClient sqs,
- final String queueName,
- final QueueFig fig)
- throws Exception {
+ public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig )
+ throws Exception {
- final String deadletterQueueName = String.format("%s_dead", queueName);
- final Map<String, String> deadLetterAttributes = new HashMap<>(2);
+ final String deadletterQueueName = String.format( "%s_dead", queueName );
+ final Map<String, String> deadLetterAttributes = new HashMap<>( 2 );
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
+ deadLetterAttributes.put( "MessageRetentionPeriod", fig.getDeadletterRetentionPeriod() );
- CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
- .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
+ CreateQueueRequest createDeadLetterQueueRequest =
+ new CreateQueueRequest().withQueueName( deadletterQueueName ).withAttributes( deadLetterAttributes );
- final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
+ final CreateQueueResult deadletterResult = sqs.createQueue( createDeadLetterQueueRequest );
- logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
+ logger.info( "Created deadletter queue with url {}", deadletterResult.getQueueUrl() );
- final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
+ final String deadletterArn = AmazonNotificationUtils.getQueueArnByName( sqs, deadletterQueueName );
- String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
- " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
+ String redrivePolicy = String
+ .format( "{\"maxReceiveCount\":\"%s\"," + " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(),
+ deadletterArn );
- final Map<String, String> queueAttributes = new HashMap<>(2);
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
- deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
+ final Map<String, String> queueAttributes = new HashMap<>( 2 );
+ deadLetterAttributes.put( "MessageRetentionPeriod", fig.getRetentionPeriod() );
+ deadLetterAttributes.put( "RedrivePolicy", redrivePolicy );
CreateQueueRequest createQueueRequest = new CreateQueueRequest().
- withQueueName(queueName)
- .withAttributes(queueAttributes);
+ withQueueName( queueName )
+ .withAttributes( queueAttributes );
- CreateQueueResult result = sqs.createQueue(createQueueRequest);
+ CreateQueueResult result = sqs.createQueue( createQueueRequest );
String url = result.getQueueUrl();
- logger.info("Created SQS queue with url {}", url);
+ logger.info( "Created SQS queue with url {}", url );
return url;
}
- public static void setQueuePermissionsToReceive(final AmazonSQSClient sqs,
- final String queueUrl,
- final List<String> topicARNs) throws Exception{
- String queueARN = getQueueArnByUrl(sqs, queueUrl);
+ public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
+ final List<String> topicARNs ) throws Exception {
- Statement statement = new Statement(Statement.Effect.Allow)
- .withActions(SQSActions.SendMessage)
- .withPrincipals(new Principal("*"))
- .withResources(new Resource(queueARN));
+ String queueARN = getQueueArnByUrl( sqs, queueUrl );
- List<Condition> conditions = new ArrayList<>();
+ Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage )
+ .withPrincipals( new Principal( "*" ) )
+ .withResources( new Resource( queueARN ) );
- for(String topicARN : topicARNs){
+ List<Condition> conditions = new ArrayList<>();
- conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+ for ( String topicARN : topicARNs ) {
+ conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) );
}
- statement.setConditions(conditions);
+ statement.setConditions( conditions );
- Policy policy = new Policy("SubscriptionPermission").withStatements(statement);
+ Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement );
final Map<String, String> queueAttributes = new HashMap<>();
- queueAttributes.put("Policy", policy.toJson());
+ queueAttributes.put( "Policy", policy.toJson() );
- SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest(queueUrl, queueAttributes);
+ SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes );
try {
- sqs.setQueueAttributes(queueAttributesRequest);
- }catch (Exception e){
- logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString(), e);
+ sqs.setQueueAttributes( queueAttributesRequest );
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
+ topicARNs.toString(), e );
}
-
-
}
- public static String getQueueArnByName(final AmazonSQSClient sqs,
- final String queueName)
- throws Exception {
+ public static String getQueueArnByName( final AmazonSQSClient sqs, final String queueName ) throws Exception {
String queueUrl = null;
try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
queueUrl = result.getQueueUrl();
-
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ }
+ catch ( QueueDoesNotExistException queueDoesNotExistException ) {
//no op, swallow
- logger.warn("Queue {} does not exist", queueName);
+ logger.warn( "Queue {} does not exist", queueName );
return null;
-
- } catch (Exception e) {
- logger.error(String.format("Failed to get URL for Queue [%s] from SQS", queueName), e);
+ }
+ catch ( Exception e ) {
+ logger.error( String.format( "Failed to get URL for Queue [%s] from SQS", queueName ), e );
throw e;
}
- if (queueUrl != null) {
+ if ( queueUrl != null ) {
try {
- GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
- .withAttributeNames("All");
+ GetQueueAttributesRequest queueAttributesRequest =
+ new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" );
- GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest );
Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
- return sqsAttributeMap.get("QueueArn");
-
- } catch (Exception e) {
- logger.error("Failed to get queue URL from service", e);
+ return sqsAttributeMap.get( "QueueArn" );
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to get queue URL from service", e );
throw e;
}
}
@@ -143,75 +152,80 @@ public class AmazonNotificationUtils {
return null;
}
- public static String getQueueArnByUrl(final AmazonSQSClient sqs,
- final String queueUrl)
- throws Exception {
+
+ public static String getQueueArnByUrl( final AmazonSQSClient sqs, final String queueUrl ) throws Exception {
try {
- GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
- .withAttributeNames("All");
+ GetQueueAttributesRequest queueAttributesRequest =
+ new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" );
- GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest );
Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
- return sqsAttributeMap.get("QueueArn");
-
- } catch (Exception e) {
- logger.error("Failed to get queue URL from service", e);
+ return sqsAttributeMap.get( "QueueArn" );
+ }
+ catch ( Exception e ) {
+ logger.error( "Failed to get queue URL from service", e );
throw e;
}
}
- public static String getTopicArn(final AmazonSNSClient sns,
- final String queueName,
- final boolean createOnMissing)
- throws Exception {
- if (logger.isDebugEnabled())
- logger.debug("Looking up Topic ARN: {}", queueName);
+ public static String getTopicArn( final AmazonSNSClient sns, final String queueName, final boolean createOnMissing )
+ throws Exception {
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Looking up Topic ARN: {}", queueName );
+ }
ListTopicsResult listTopicsResult = sns.listTopics();
String topicArn = null;
- for (Topic topic : listTopicsResult.getTopics()) {
+ for ( Topic topic : listTopicsResult.getTopics() ) {
String arn = topic.getTopicArn();
- if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) {
+ if ( queueName.equals( arn.substring( arn.lastIndexOf( ':' ) ) ) ) {
topicArn = arn;
- logger.info("Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName);
+ logger.info( "Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName );
}
}
- if (topicArn == null && createOnMissing) {
- logger.info("Creating topic for queue=[{}]...", queueName);
+ if ( topicArn == null && createOnMissing ) {
+ logger.info( "Creating topic for queue=[{}]...", queueName );
- CreateTopicResult createTopicResult = sns.createTopic(queueName);
+ CreateTopicResult createTopicResult = sns.createTopic( queueName );
topicArn = createTopicResult.getTopicArn();
- logger.info("Successfully created topic with name {} and arn {}", queueName, topicArn);
- } else {
- logger.error("Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName, createOnMissing);
+ logger.info( "Successfully created topic with name {} and arn {}", queueName, topicArn );
+ }
+ else {
+ logger.error( "Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName,
+ createOnMissing );
}
- if (logger.isDebugEnabled())
- logger.debug("Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName );
+ }
return topicArn;
}
- public static String getQueueUrlByName(final AmazonSQSClient sqs,
- final String queueName) {
+
+ public static String getQueueUrlByName( final AmazonSQSClient sqs, final String queueName ) {
try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
return result.getQueueUrl();
- } catch (QueueDoesNotExistException e) {
- logger.error("Queue {} does not exist", queueName);
- throw e;
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
+ }
+ catch ( QueueDoesNotExistException e ) {
+ //no op, return null
+ logger.error( "Queue {} does not exist", queueName );
+ return null;
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to get queue from service", e );
throw e;
}
}
[4/5] incubator-usergrid git commit: Fixes fat finger
Posted by gr...@apache.org.
Fixes fat finger
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0f94609e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0f94609e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0f94609e
Branch: refs/heads/two-dot-o-dev
Commit: 0f94609eabc5e563428b51590da93c61b397dd3e
Parents: 491008e
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 17:31:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 17:31:16 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f94609e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 85aecdf..f5ef2fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -299,7 +299,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
- subscibeAndAck( observable, message );
+ subscribeAndAck( observable, message );
}
@@ -334,7 +334,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
applicationScope, entity, edge ) );
- subscibeAndAck( edgeIndexObservable, message );
+ subscribeAndAck( edgeIndexObservable, message );
}
@Override
@@ -363,7 +363,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
- subscibeAndAck( observable, message );
+ subscribeAndAck( observable, message );
}
@@ -399,7 +399,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
entityDeleteResults.getIndexObservable() );
- subscibeAndAck( merged, message );
+ subscribeAndAck( merged, message );
}
@@ -520,7 +520,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
* @param observable
* @param message
*/
- private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){
+ private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
observable.doOnCompleted( ()-> ack(message) ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
}
}
[3/5] incubator-usergrid git commit: Upgrades Jackson to fix this
issue.
Posted by gr...@apache.org.
Upgrades Jackson to fix this issue.
https://github.com/FasterXML/jackson-databind/issues/656
Upgrades events to be polymorphic to allow for easy addition and mapping of events without 1 large class for all events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/491008e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/491008e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/491008e9
Branch: refs/heads/two-dot-o-dev
Commit: 491008e961ac893f734d12352af2ac040b1ea1fe
Parents: 29d115f
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 17:02:53 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 17:02:53 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 145 ++++++++++--------
.../asyncevents/EventBuilderImpl.java | 9 +-
.../asyncevents/model/AsyncEvent.java | 153 +++----------------
.../asyncevents/model/EdgeDeleteEvent.java | 36 ++++-
.../asyncevents/model/EdgeIndexEvent.java | 39 ++++-
.../asyncevents/model/EntityDeleteEvent.java | 18 ++-
.../asyncevents/model/EntityIndexEvent.java | 18 ++-
.../model/InitializeApplicationIndexEvent.java | 21 ++-
.../index/AmazonAsyncEventServiceTest.java | 3 -
.../index/AsyncIndexServiceTest.java | 14 +-
stack/corepersistence/pom.xml | 3 +-
.../index/IndexLocationStrategy.java | 4 +-
12 files changed, 222 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 0429af3..85aecdf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,20 +25,20 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import com.codahale.metrics.Histogram;
-import com.google.common.base.Preconditions;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.*;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -46,6 +46,10 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.queue.QueueManager;
@@ -56,7 +60,9 @@ import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -211,43 +217,42 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- private void handleMessages(final List<QueueMessage> messages) {
- if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size());
- for (QueueMessage message : messages) {
- final AsyncEvent event = (AsyncEvent) message.getBody();
-
- if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType());
-
- if (event == null || event.getEventType() == null) {
- logger.error("AsyncEvent type or event is null!");
- } else {
- switch (event.getEventType()) {
+ private void handleMessages( final List<QueueMessage> messages ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "handleMessages with {} message", messages.size() );
+ }
- case EDGE_DELETE:
- handleEdgeDelete(message);
- break;
+ for ( QueueMessage message : messages ) {
+ final AsyncEvent event = ( AsyncEvent ) message.getBody();
- case EDGE_INDEX:
- handleEdgeIndex(message);
- break;
+ logger.debug( "Processing {} event", event );
- case ENTITY_DELETE:
- handleEntityDelete(message);
- break;
+ if ( event == null ) {
+ logger.error( "AsyncEvent type or event is null!" );
+ continue;
+ }
- case ENTITY_INDEX:
- handleEntityIndexUpdate(message);
- break;
- case APPLICATION_INDEX:
- handleInitializeApplicationIndex(message);
- break;
+ if ( event instanceof EdgeDeleteEvent ) {
+ handleEdgeDelete( message );
+ }
+ else if ( event instanceof EdgeIndexEvent ) {
+ handleEdgeIndex( message );
+ }
- default:
- logger.error("Unknown EventType: {}", event.getEventType());
+ else if ( event instanceof EntityDeleteEvent ) {
+ handleEntityDelete( message );
+ }
+ else if ( event instanceof EntityIndexEvent ) {
+ handleEntityIndexUpdate( message );
+ }
- }
+ else if ( event instanceof InitializeApplicationIndexEvent ) {
+ handleInitializeApplicationIndex( message );
+ }
+ else {
+ logger.error( "Unknown EventType: {}", event );
}
messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
@@ -257,7 +262,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
- IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
+ applicationScope );
offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}
@@ -272,19 +278,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void handleEntityIndexUpdate(final QueueMessage message) {
- Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
+ Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
- final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
+ final AsyncEvent event = ( AsyncEvent ) message.getBody();
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
+ Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
+
+ final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
+
//process the entity immediately
//only process the same version, otherwise ignore
- final EntityIdScope entityIdScope = event.getEntityIdScope();
+ final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
final Id entityId = entityIdScope.getId();
- final long updatedAfter = event.getUpdatedAfter();
+ final long updatedAfter = entityIndexEvent.getUpdatedAfter();
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
@@ -310,17 +319,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType()));
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
+ Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
+
+ final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
- final ApplicationScope applicationScope = event.getApplicationScope();
- final Edge edge = event.getEdge();
+ final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
+ final Edge edge = edgeIndexEvent.getEdge();
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
applicationScope, entity, edge ) );
subscibeAndAck( edgeIndexObservable, message );
@@ -339,11 +350,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType()));
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
+ Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
- final ApplicationScope applicationScope = event.getApplicationScope();
- final Edge edge = event.getEdge();
+
+ final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
+
+ final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
+ final Edge edge = edgeIndexEvent.getEdge();
if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
@@ -364,12 +378,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
- Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
- String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
+ Preconditions.checkArgument( event instanceof EntityDeleteEvent,
+ String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
- final ApplicationScope applicationScope = event.getApplicationScope();
- final Id entityId = event.getEntityId();
+
+ final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
+ final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
+ final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
if (logger.isDebugEnabled())
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
@@ -391,10 +407,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
- Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" );
+ Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
+
+ final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
+ ( InitializeApplicationIndexEvent ) event;
- final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
+ final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
index.initialize();
ack( message );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 46cec2e..4bf5695 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -174,8 +174,15 @@ public class EventBuilderImpl implements EventBuilder {
entity -> {
final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+ /**
+ * We don't have a modified field, so we can't check, pass it through
+ */
+ if ( modified == null ) {
+ return true;
+ }
+
//only re-index if it has been updated and been updated after our timestamp
- return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+ return modified.getValue() >= entityIndexOperation.getUpdatedSince();
} )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 3d22986..6b45297 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -19,155 +19,42 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import java.io.Serializable;
+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.model.entity.Id;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import java.io.Serializable;
/**
- * Created by Jeff West on 5/25/15.
+ * Marker class for serialization
*/
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AsyncEvent implements Serializable {
-
- @JsonProperty
- protected IndexLocationStrategy indexLocationStrategy;
-
- @JsonProperty
- protected EventType eventType;
-
- @JsonProperty
- protected EntityIdScope entityIdScope;
-
- @JsonProperty
- protected ApplicationScope applicationScope;
-
- @JsonProperty
- protected Id entityId;
+@JsonIgnoreProperties( ignoreUnknown = true )
+@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
+@JsonSubTypes( {
+ @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
+ @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
+ @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
+ @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
+ @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+} )
- @JsonProperty
- protected Edge edge;
+public abstract class AsyncEvent implements Serializable {
@JsonProperty
protected long creationTime;
- /**
- * required for jackson, do not delete
- */
+ //set by default, will be overridden when de-serializing
protected AsyncEvent() {
+ creationTime = System.currentTimeMillis();
}
- public AsyncEvent(final EventType eventType) {
-
- this.eventType = eventType;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(final EventType eventType,
- final EntityIdScope entityIdScope) {
-
- this.eventType = eventType;
- this.entityIdScope = entityIdScope;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) {
- this.eventType = eventType;
- this.indexLocationStrategy = indexLocationStrategy;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) {
- this.eventType = eventType;
- this.applicationScope = applicationScope;
- this.edge = edge;
- this.creationTime = System.currentTimeMillis();
- }
-
- public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Id entityId, Edge edge) {
- this.eventType = eventType;
- this.applicationScope = applicationScope;
- this.edge = edge;
- this.entityId = entityId;
- this.creationTime = System.currentTimeMillis();
- }
-
- @JsonSerialize()
- public final Id getEntityId() {
- return entityId;
- }
-
- protected void setEntityId(Id entityId) {
- this.entityId = entityId;
- }
-
- @JsonSerialize()
- public final EventType getEventType() {
- return eventType;
- }
-
- protected void setEventType(EventType eventType) {
- this.eventType = eventType;
- }
-
- @JsonSerialize()
- public EntityIdScope getEntityIdScope() {
- return entityIdScope;
- }
-
- protected void setEntityIdScope(EntityIdScope entityIdScope) {
- this.entityIdScope = entityIdScope;
- }
-
- @JsonSerialize()
- public ApplicationScope getApplicationScope() {
- return applicationScope;
- }
-
- protected void setApplicationScope(ApplicationScope applicationScope) {
- this.applicationScope = applicationScope;
- }
-
- @JsonSerialize()
- @JsonDeserialize(as=ReplicatedIndexLocationStrategy.class)
- public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
-
- protected void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
- this.indexLocationStrategy = indexLocationStrategy;
- }
-
- @JsonSerialize()
- public Edge getEdge() {
- return edge;
- }
-
- @JsonSerialize()
- public long getCreationTime() { return creationTime; }
-
- protected void setEdge(Edge edge) {
- this.edge = edge;
- }
-
- public enum EventType {
- EDGE_DELETE,
- EDGE_INDEX,
- ENTITY_DELETE,
- ENTITY_INDEX,
- APPLICATION_INDEX;
-
- public String asString() {
- return toString();
- }
+ public long getCreationTime() {
+ return creationTime;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
index 3af9818..af16bac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
@@ -19,19 +19,41 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+
public final class EdgeDeleteEvent extends AsyncEvent {
+
+ @JsonProperty
+ protected ApplicationScope applicationScope;
+
+
+ @JsonProperty
+ protected Edge edge;
+
+
public EdgeDeleteEvent() {
}
- public EdgeDeleteEvent(ApplicationScope applicationScope, Edge edge) {
- super(EventType.EDGE_DELETE, applicationScope, edge);
+
+ public EdgeDeleteEvent( ApplicationScope applicationScope, Edge edge ) {
+ this.applicationScope = applicationScope;
+ this.edge = edge;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public Edge getEdge() {
+ return edge;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
index cd0118f..c89b828 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -28,13 +30,19 @@ import org.apache.usergrid.persistence.model.entity.Id;
import java.io.Serializable;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+
public final class EdgeIndexEvent
- extends AsyncEvent
- implements Serializable {
+ extends AsyncEvent {
+
+
+ @JsonProperty
+ protected ApplicationScope applicationScope;
+
+ @JsonProperty
+ protected Id entityId;
+
+ @JsonProperty
+ protected Edge edge;
/**
* Needed by jackson
@@ -43,6 +51,23 @@ public final class EdgeIndexEvent
}
public EdgeIndexEvent(ApplicationScope applicationScope, Id entityId, Edge edge) {
- super(EventType.EDGE_INDEX, applicationScope, entityId, edge);
+ this.applicationScope = applicationScope;
+ this.entityId = entityId;
+ this.edge = edge;
+ }
+
+
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+
+ public Edge getEdge() {
+ return edge;
+ }
+
+
+ public Id getEntityId() {
+ return entityId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 606deae..847a07d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -19,20 +19,28 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
public final class EntityDeleteEvent extends AsyncEvent {
+
+
+ @JsonProperty
+ protected EntityIdScope entityIdScope;
+
public EntityDeleteEvent() {
}
public EntityDeleteEvent(EntityIdScope entityIdScope) {
- super(EventType.ENTITY_DELETE, entityIdScope);
+ this.entityIdScope = entityIdScope;
+ }
+
+
+ public EntityIdScope getEntityIdScope() {
+ return entityIdScope;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 81961a0..a04326a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -19,22 +19,26 @@
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+
public final class EntityIndexEvent extends AsyncEvent {
+
+ @JsonProperty
+ protected EntityIdScope entityIdScope;
+
+ @JsonProperty
private long updatedAfter;
public EntityIndexEvent() {
}
public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
- super(EventType.ENTITY_INDEX, entityIdScope);
+ this.entityIdScope = entityIdScope;
this.updatedAfter = updatedAfter;
}
@@ -44,7 +48,7 @@ public final class EntityIndexEvent extends AsyncEvent {
}
- public void setUpdatedAfter( long updatedAfter ) {
- this.updatedAfter = updatedAfter;
+ public EntityIdScope getEntityIdScope() {
+ return entityIdScope;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 8b20651..68f0113 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -19,20 +19,31 @@
*/
package org.apache.usergrid.corepersistence.asyncevents.model;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
/**
* event to init app index
*/
-@JsonDeserialize(as = AsyncEvent.class)
+
public class InitializeApplicationIndexEvent extends AsyncEvent {
- public InitializeApplicationIndexEvent() {
- super(EventType.APPLICATION_INDEX);
- }
+
+
+ @JsonProperty
+ protected IndexLocationStrategy indexLocationStrategy;
+
public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) {
- super(EventType.APPLICATION_INDEX, indexLocationStrategy);
+ this.indexLocationStrategy = indexLocationStrategy;
+
+ }
+
+ public IndexLocationStrategy getIndexLocationStrategy() {
+ return indexLocationStrategy;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index d37701b..4660389 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -67,9 +67,6 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
public MetricsFactory metricsFactory;
@Inject
- public IndexService indexService;
-
- @Inject
public RxTaskScheduler rxTaskScheduler;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 9b104fc..d34a1a9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -131,7 +131,7 @@ public abstract class AsyncIndexServiceTest {
/**
- * Write 10k edges 10 at a time in parallel
+ * Write 500 edges
*/
@@ -139,19 +139,19 @@ public abstract class AsyncIndexServiceTest {
final Id connectingId = createId("connecting");
final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
- return graphManager.writeEdge(edge).subscribeOn(Schedulers.io());
+ return graphManager.writeEdge( edge );
}).toList().toBlocking().last();
+ //queue up processing
asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity );
- emf.refreshIndex(applicationScope.getApplication().getUuid());
-
- // Thread.sleep( 1000000000000l );
final EntityIndex EntityIndex =
entityIndexFactory.createEntityIndex( indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
+ emf.refreshIndex(applicationScope.getApplication().getUuid());
+
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
//query until it's available
@@ -176,13 +176,13 @@ public abstract class AsyncIndexServiceTest {
}
- private CandidateResults getResults( final EntityIndex EntityIndex,
+ private CandidateResults getResults( final EntityIndex entityIndex,
final SearchEdge searchEdge, final SearchTypes searchTypes, final int expectedSize, final int attempts ) {
for ( int i = 0; i < attempts; i++ ) {
final CandidateResults candidateResults =
- EntityIndex.search( searchEdge, searchTypes, "select *", 100, 0 );
+ entityIndex.search( searchEdge, searchTypes, "select *", 100, 0 );
if ( candidateResults.size() == expectedSize ) {
return candidateResults;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index b95f3e1..1ee5272 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -65,8 +65,7 @@ limitations under the License.
<guice.version>4.0-beta5</guice.version>
<guicyfig.version>3.2</guicyfig.version>
<hystrix.version>1.4.0</hystrix.version>
- <jackson-2-version>2.4.1</jackson-2-version>
- <jackson-smile.verson>2.4.3</jackson-smile.verson>
+ <jackson-2-version>2.6.0</jackson-2-version>
<mockito.version>1.10.8</mockito.version>
<junit.version>4.11</junit.version>
<kryo-serializers.version>0.26</kryo-serializers.version>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
index 6b07b9e..e5c8f8f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
@@ -19,6 +19,8 @@
*/
package org.apache.usergrid.persistence.index;
+import java.io.Serializable;
+
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
/**
* location strategy for index
@@ -33,7 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
applicationIndexName = {indexRoot}_applications_{bucketId}
applicationAliasName = {indexRoot}_{appId}_read_alias || {indexRoot}_{appId}_write_alias
*/
-public interface IndexLocationStrategy {
+public interface IndexLocationStrategy extends Serializable {
/**
* get the alias name
* @return
[5/5] incubator-usergrid git commit: Merge commit
'0f94609eabc5e563428b51590da93c61b397dd3e' into two-dot-o-dev
Posted by gr...@apache.org.
Merge commit '0f94609eabc5e563428b51590da93c61b397dd3e' into two-dot-o-dev
* commit '0f94609eabc5e563428b51590da93c61b397dd3e':
Fixes fat finger
Upgrades Jackson to fix this issue.
Fixes runtime bug with joda time conflict with Astayanx
Updates amazon async service wiring to wire to event builder
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9427b4ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9427b4ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9427b4ce
Branch: refs/heads/two-dot-o-dev
Commit: 9427b4ce47f389e517231b97c09c588f0cbf18f4
Parents: ba5c491 0f94609
Author: GERey <gr...@apigee.com>
Authored: Tue Aug 4 16:34:15 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Tue Aug 4 16:34:15 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 261 ++++++++++---------
.../asyncevents/AsyncIndexProvider.java | 11 +-
.../asyncevents/EventBuilder.java | 11 +-
.../asyncevents/EventBuilderImpl.java | 21 +-
.../asyncevents/InMemoryAsyncEventService.java | 14 +-
.../asyncevents/model/AsyncEvent.java | 153 ++---------
.../asyncevents/model/EdgeDeleteEvent.java | 36 ++-
.../asyncevents/model/EdgeIndexEvent.java | 39 ++-
.../asyncevents/model/EntityDeleteEvent.java | 18 +-
.../asyncevents/model/EntityIndexEvent.java | 30 ++-
.../model/InitializeApplicationIndexEvent.java | 21 +-
.../index/AmazonAsyncEventServiceTest.java | 8 +-
.../index/AsyncIndexServiceTest.java | 14 +-
stack/corepersistence/pom.xml | 3 +-
.../index/IndexLocationStrategy.java | 4 +-
stack/corepersistence/queue/pom.xml | 20 +-
.../queue/util/AmazonNotificationUtils.java | 230 ++++++++--------
17 files changed, 453 insertions(+), 441 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9427b4ce/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------