You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/08/10 19:29:19 UTC

[01/13] incubator-usergrid git commit: Updates amazon async service wiring to wire to event builder

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-903 0a78e1544 -> 4d75b6b10


Updates amazon async service wiring to wire to event builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d67c220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d67c220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d67c220d

Branch: refs/heads/USERGRID-903
Commit: d67c220dd7221131a5c7bbcfebcd60f78cddffb1
Parents: b9d2d35
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 12:30:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 12:30:27 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 132 +++++++++----------
 .../asyncevents/AsyncIndexProvider.java         |  11 +-
 .../asyncevents/EventBuilder.java               |  11 +-
 .../asyncevents/EventBuilderImpl.java           |  12 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  14 +-
 .../asyncevents/model/EntityIndexEvent.java     |  16 ++-
 .../index/AmazonAsyncEventServiceTest.java      |   7 +-
 7 files changed, 107 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index ec43ab7..0429af3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -27,14 +27,15 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.codahale.metrics.Histogram;
 import com.google.common.base.Preconditions;
-import org.apache.usergrid.corepersistence.CpEntityManager;
+
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
 import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.utils.UUIDUtils;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,15 +79,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final QueueManager queue;
     private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
-    private final IndexService indexService;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
+    private final EventBuilder eventBuilder;
+    private final RxTaskScheduler rxTaskScheduler;
 
     private final Timer readTimer;
     private final Timer writeTimer;
     private final Timer ackTimer;
 
+    /**
+     * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+     */
     private final Object mutex = new Object();
 
     private final Counter indexErrorCounter;
@@ -99,19 +104,17 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     @Inject
-    public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
-                                   final IndexProcessorFig indexProcessorFig,
-                                   final MetricsFactory metricsFactory,
-                                   final IndexService indexService,
-                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                   final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                                   final EntityIndexFactory entityIndexFactory
-    ) {
-
-        this.indexService = indexService;
+    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory, final IndexProcessorFig indexProcessorFig,
+                                    final MetricsFactory metricsFactory,  final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                    final IndexLocationStrategyFactory indexLocationStrategyFactory, final EntityIndexFactory entityIndexFactory,
+                                    final EventBuilder eventBuilder,
+                                    final RxTaskScheduler rxTaskScheduler ) {
+
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
+        this.eventBuilder = eventBuilder;
+        this.rxTaskScheduler = rxTaskScheduler;
 
         this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -144,7 +147,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessage(operation);
+            this.queue.sendMessage( operation );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -157,7 +160,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         try {
             //signal to SQS
-            this.queue.sendMessages(operations);
+            this.queue.sendMessages( operations );
         } catch (IOException e) {
             throw new RuntimeException("Unable to queue message", e);
         } finally {
@@ -189,33 +192,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
     /**
      * Ack message in SQS
      */
-    public void ack(final List<QueueMessage> messages) {
-
-        final Timer.Context timer = this.ackTimer.time();
-
-        try{
-            // no op
-            if (messages.size() == 0) {
-                return;
-            }
-            queue.commitMessages(messages);
-
-            //decrement our in-flight counter
-            inFlight.addAndGet(-1 * messages.size());
-
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }
-        finally {
-            timer.stop();
-        }
-
-
-    }
-
-    /**
-     * Ack message in SQS
-     */
     public void ack(final QueueMessage message) {
 
         final Timer.Context timer = this.ackTimer.time();
@@ -290,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
                                        final Entity entity) {
 
-        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId())));
+        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId()), 0));
     }
 
 
@@ -298,7 +274,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
 
-        final AsyncEvent event = (AsyncEvent) message.getBody();
+        final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
 
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
         Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
@@ -307,13 +283,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
         //only process the same version, otherwise ignore
         final EntityIdScope entityIdScope = event.getEntityIdScope();
         final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+        final Id entityId = entityIdScope.getId();
+        final long updatedAfter = event.getUpdatedAfter();
+
+        final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+        final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
 
-        ecm.load(entityIdScope.getId())
-                .first()
-                .flatMap(entity -> indexService.indexEntity(applicationScope, entity))
-                .doOnNext(ignore -> ack(message)).subscribe();
+        subscibeAndAck( observable, message );
     }
 
 
@@ -324,7 +301,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);
 
-        offer(operation);
+        offer( operation );
     }
 
     public void handleEdgeIndex(final QueueMessage message) {
@@ -339,18 +316,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final ApplicationScope applicationScope = event.getApplicationScope();
         final Edge edge = event.getEdge();
 
-        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
 
-        ecm.load(event.getEntityId())
-                .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge))
-                .doOnNext(ignore -> ack(message)).subscribe();
+
+        final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+            applicationScope, entity, edge ) );
+
+        subscibeAndAck( edgeIndexObservable, message );
     }
 
     @Override
     public void queueDeleteEdge(final ApplicationScope applicationScope,
                                 final Edge edge) {
 
-        offer(new EdgeDeleteEvent(applicationScope, edge));
+        offer( new EdgeDeleteEvent( applicationScope, edge ) );
     }
 
     public void handleEdgeDelete(final QueueMessage message) {
@@ -367,15 +347,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
 
-        indexService.deleteIndexEdge(applicationScope, edge)
-                .doOnNext(ignore -> ack(message)).subscribe();
+        final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
+
+        subscibeAndAck( observable, message );
     }
 
 
     @Override
     public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
 
-        offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId)));
+        offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
     }
 
     public void handleEntityDelete(final QueueMessage message) {
@@ -384,7 +365,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
-        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType()));
+        Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
+            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
 
         final ApplicationScope applicationScope = event.getApplicationScope();
         final Id entityId = event.getEntityId();
@@ -392,10 +374,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
         if (logger.isDebugEnabled())
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
 
-        ack(message);
+        ack( message );
+
+        final EventBuilderImpl.EntityDeleteResults
+            entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+        final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
+            entityDeleteResults.getIndexObservable() );
 
-        indexService.deleteEntityIndexes(applicationScope, entityId, UUIDUtils.maxTimeUUID(Long.MAX_VALUE))
-                .doOnNext(ignore -> ack(message)).subscribe();
+        subscibeAndAck( merged, message );
     }
 
 
@@ -407,9 +395,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
         Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
 
         final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
-        final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+        final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
         index.initialize();
-        ack(message);
+        ack( message );
     }
 
     /**
@@ -494,7 +482,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
         //change to id scope to avoid serialization issues
-        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id)));
+        offer(new EntityIndexEvent(new EntityIdScope(applicationScope, id), updatedSince));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -502,8 +490,18 @@ public class AmazonAsyncEventService implements AsyncEventService {
         List batch = new ArrayList<EdgeScope>();
         for ( EdgeScope e : edges){
             //change to id scope to avoid serialization issues
-            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode())));
+            batch.add(new EntityIndexEvent(new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
         }
-        offerBatch(batch);
+        offerBatch( batch );
+    }
+
+
+    /**
+     * Subscribes to the observable and acks the message via SQS on completion
+     * @param observable
+     * @param message
+     */
+    private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){
+       observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 0a58369..0e773cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -44,7 +44,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
     private final QueueManagerFactory queueManagerFactory;
     private final MetricsFactory metricsFactory;
-    private final IndexService indexService;
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EventBuilder eventBuilder;
@@ -58,7 +57,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
                               final QueueManagerFactory queueManagerFactory,
                               final MetricsFactory metricsFactory,
-                              final IndexService indexService,
                               final RxTaskScheduler rxTaskScheduler,
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
                               final EventBuilder eventBuilder,
@@ -68,7 +66,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
         this.metricsFactory = metricsFactory;
-        this.indexService = indexService;
         this.rxTaskScheduler = rxTaskScheduler;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.eventBuilder = eventBuilder;
@@ -97,11 +94,11 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             case LOCAL:
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
             case SQS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
             case SNS:
-                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
+                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory,
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f9f157e..d246e2f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -37,13 +37,14 @@ import rx.Observable;
  * Interface for constructing an observable stream to perform asynchonous events
  */
 public interface EventBuilder {
+
     /**
      * Return the cold observable of entity index update operations
      * @param applicationScope
      * @param entity
      * @return
      */
-    Observable<IndexOperationMessage> queueEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
+    Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
 
     /**
      * Return the cold observable of the new edge operation
@@ -52,7 +53,7 @@ public interface EventBuilder {
      * @param newEdge
      * @return
      */
-    Observable<IndexOperationMessage> queueNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
+    Observable<IndexOperationMessage> buildNewEdge( ApplicationScope applicationScope, Entity entity, Edge newEdge );
 
     /**
      * Return the cold observable of the deleted edge operations
@@ -60,7 +61,7 @@ public interface EventBuilder {
      * @param edge
      * @return
      */
-    Observable<IndexOperationMessage> queueDeleteEdge( ApplicationScope applicationScope, Edge edge );
+    Observable<IndexOperationMessage> buildDeleteEdge( ApplicationScope applicationScope, Edge edge );
 
     /**
      * Return a ben with 2 obervable streams for entity delete.
@@ -68,14 +69,14 @@ public interface EventBuilder {
      * @param entityId
      * @return
      */
-    EventBuilderImpl.EntityDeleteResults queueEntityDelete( ApplicationScope applicationScope, Id entityId );
+    EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
 
     /**
      * Re-index an entity in the scope provided
      * @param entityIndexOperation
      * @return
      */
-    Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
+    Observable<IndexOperationMessage> buildEntityIndex( EntityIndexOperation entityIndexOperation );
 
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index e4cd4b5..46cec2e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,7 +73,7 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> queueEntityIndexUpdate( final ApplicationScope applicationScope,
+    public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
                                                                      final Entity entity ) {
 
         //process the entity immediately
@@ -89,7 +89,7 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> queueNewEdge( final ApplicationScope applicationScope, final Entity entity,
+    public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
                                                            final Edge newEdge ) {
 
         log.debug( "Indexing  in app scope {} with entity {} and new edge {}",
@@ -103,8 +103,8 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> queueDeleteEdge( final ApplicationScope applicationScope,
-                                                              final Edge edge ) {
+    public Observable<IndexOperationMessage> buildDeleteEdge( final ApplicationScope applicationScope, final Edge
+        edge ) {
         log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
 
         final Observable<IndexOperationMessage> edgeObservable =
@@ -121,7 +121,7 @@ public class EventBuilderImpl implements EventBuilder {
     //it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
 
     @Override
-    public EntityDeleteResults queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+    public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
         log.debug( "Deleting entity id from index in app scope {} with entityId {} }", applicationScope, entityId );
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
@@ -163,7 +163,7 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
+    public Observable<IndexOperationMessage> buildEntityIndex( final EntityIndexOperation entityIndexOperation ) {
 
         final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index adb4a90..830033d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,8 +20,6 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
-import com.amazonaws.services.opsworks.model.App;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,19 +75,19 @@ public class InMemoryAsyncEventService implements AsyncEventService {
         //only process the same version, otherwise ignore
 
 
-        run( eventBuilder.queueEntityIndexUpdate(applicationScope, entity) );
+        run( eventBuilder.buildEntityIndexUpdate( applicationScope, entity ) );
     }
 
 
     @Override
     public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
-        run( eventBuilder.queueNewEdge(applicationScope, entity, newEdge) );
+        run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) );
     }
 
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
-        run( eventBuilder.queueDeleteEdge(applicationScope, edge) );
+        run( eventBuilder.buildDeleteEdge( applicationScope, edge ) );
     }
 
 
@@ -97,7 +95,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
 
         final EventBuilderImpl.EntityDeleteResults results =
-            eventBuilder.queueEntityDelete( applicationScope, entityId );
+            eventBuilder.buildEntityDelete( applicationScope, entityId );
 
         run( results.getIndexObservable() );
         run( results.getEntitiesCompacted() );
@@ -107,7 +105,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
 
-        run(eventBuilder.index( entityIndexOperation ));
+        run(eventBuilder.buildEntityIndex( entityIndexOperation ));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -115,7 +113,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
             final EntityIndexOperation entityIndexOperation = new EntityIndexOperation(e.getApplicationScope(),
                 e.getEdge().getTargetNode(), updatedSince);
 
-            run(eventBuilder.index (entityIndexOperation));
+            run(eventBuilder.buildEntityIndex( entityIndexOperation ));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 7b79987..81961a0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -27,10 +27,24 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
  */
 @JsonDeserialize(as = AsyncEvent.class)
 public final class EntityIndexEvent extends AsyncEvent {
+
+    private long updatedAfter;
+
     public EntityIndexEvent() {
     }
 
-    public EntityIndexEvent(EntityIdScope entityIdScope) {
+    public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
         super(EventType.ENTITY_INDEX, entityIdScope);
+        this.updatedAfter = updatedAfter;
+    }
+
+
+    public long getUpdatedAfter() {
+        return updatedAfter;
+    }
+
+
+    public void setUpdatedAfter( long updatedAfter ) {
+        this.updatedAfter = updatedAfter;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d67c220d/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 9cf896c..d37701b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.corepersistence.asyncevents.EventBuilder;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
@@ -71,6 +72,9 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     @Inject
     public RxTaskScheduler rxTaskScheduler;
 
+    @Inject
+    public EventBuilder eventBuilder;
+
 
     @Inject
     public IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -81,8 +85,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
     }
 
 


[02/13] incubator-usergrid git commit: Invalidate the migration info cache after resetting the version.

Posted by gr...@apache.org.
Invalidate the migration info cache after resetting the version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ba5c4914
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ba5c4914
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ba5c4914

Branch: refs/heads/USERGRID-903
Commit: ba5c4914167335c5b4ca292f6f1058eeb97934ee
Parents: 6b90b66
Author: Michael Russo <mi...@gmail.com>
Authored: Tue Aug 4 11:53:13 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Tue Aug 4 11:53:13 2015 -0700

----------------------------------------------------------------------
 .../persistence/core/migration/data/DataMigrationManagerImpl.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba5c4914/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index 714aa11..5d41abe 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -148,6 +148,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
         Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
 
         migrationInfoSerialization.setVersion( pluginName, version );
+        migrationInfoCache.invalidateAll();
     }
 
 


[03/13] incubator-usergrid git commit: Fixes runtime bug with joda time conflict with Astayanx

Posted by gr...@apache.org.
Fixes runtime bug with joda time conflict with Astayanx

Fixes bug in AmazonUtils incorrectly re-throwing missing exception.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/29d115fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/29d115fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/29d115fc

Branch: refs/heads/USERGRID-903
Commit: 29d115fc88cc9a058735f3582bb73aee39ec16e7
Parents: d67c220
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 15:13:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 15:13:04 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/queue/pom.xml             |  20 +-
 .../queue/util/AmazonNotificationUtils.java     | 230 ++++++++++---------
 2 files changed, 135 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 7780997..2d46dc8 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -54,13 +54,19 @@
 
     <!-- tests -->
 
-    <dependency>
-      <groupId>org.apache.usergrid</groupId>
-      <artifactId>common</artifactId>
-      <version>${project.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
+      <dependency>
+          <groupId>org.apache.usergrid</groupId>
+          <artifactId>common</artifactId>
+          <version>${project.version}</version>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+      </dependency>
+
+      <dependency>
+          <groupId>joda-time</groupId>
+          <artifactId>joda-time</artifactId>
+          <version>2.8.1</version>
+      </dependency>
 
     <dependency>
       <groupId>org.apache.usergrid</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/29d115fc/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 1d86823..9561a58 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,141 +1,150 @@
 package org.apache.usergrid.persistence.queue.util;
 
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.policy.*;
-import com.amazonaws.auth.policy.actions.SQSActions;
-import com.amazonaws.auth.policy.conditions.ConditionFactory;
-import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
-import com.amazonaws.services.sns.util.Topics;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.queue.QueueFig;
+
+import com.amazonaws.auth.policy.Condition;
+import com.amazonaws.auth.policy.Policy;
+import com.amazonaws.auth.policy.Principal;
+import com.amazonaws.auth.policy.Resource;
+import com.amazonaws.auth.policy.Statement;
+import com.amazonaws.auth.policy.actions.SQSActions;
+import com.amazonaws.auth.policy.conditions.ConditionFactory;
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.CreateTopicResult;
+import com.amazonaws.services.sns.model.ListTopicsResult;
+import com.amazonaws.services.sns.model.Topic;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
+
+
 /**
  * Created by Jeff West on 5/25/15.
  */
 public class AmazonNotificationUtils {
 
-    private static final Logger logger = LoggerFactory.getLogger(AmazonNotificationUtils.class);
+    private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class );
+
 
-    public static String createQueue(final AmazonSQSClient sqs,
-                                     final String queueName,
-                                     final QueueFig fig)
-            throws Exception {
+    public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig )
+        throws Exception {
 
-        final String deadletterQueueName = String.format("%s_dead", queueName);
-        final Map<String, String> deadLetterAttributes = new HashMap<>(2);
+        final String deadletterQueueName = String.format( "%s_dead", queueName );
+        final Map<String, String> deadLetterAttributes = new HashMap<>( 2 );
 
-        deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
+        deadLetterAttributes.put( "MessageRetentionPeriod", fig.getDeadletterRetentionPeriod() );
 
-        CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
-                .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
+        CreateQueueRequest createDeadLetterQueueRequest =
+            new CreateQueueRequest().withQueueName( deadletterQueueName ).withAttributes( deadLetterAttributes );
 
-        final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
+        final CreateQueueResult deadletterResult = sqs.createQueue( createDeadLetterQueueRequest );
 
-        logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
+        logger.info( "Created deadletter queue with url {}", deadletterResult.getQueueUrl() );
 
-        final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
+        final String deadletterArn = AmazonNotificationUtils.getQueueArnByName( sqs, deadletterQueueName );
 
-        String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
-                " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
+        String redrivePolicy = String
+            .format( "{\"maxReceiveCount\":\"%s\"," + " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(),
+                deadletterArn );
 
-        final Map<String, String> queueAttributes = new HashMap<>(2);
-        deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
-        deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
+        final Map<String, String> queueAttributes = new HashMap<>( 2 );
+        deadLetterAttributes.put( "MessageRetentionPeriod", fig.getRetentionPeriod() );
+        deadLetterAttributes.put( "RedrivePolicy", redrivePolicy );
 
         CreateQueueRequest createQueueRequest = new CreateQueueRequest().
-                withQueueName(queueName)
-                .withAttributes(queueAttributes);
+                                                                            withQueueName( queueName )
+                                                                        .withAttributes( queueAttributes );
 
-        CreateQueueResult result = sqs.createQueue(createQueueRequest);
+        CreateQueueResult result = sqs.createQueue( createQueueRequest );
 
         String url = result.getQueueUrl();
 
-        logger.info("Created SQS queue with url {}", url);
+        logger.info( "Created SQS queue with url {}", url );
 
         return url;
     }
 
-    public static void setQueuePermissionsToReceive(final AmazonSQSClient sqs,
-                                                    final String queueUrl,
-                                                    final List<String> topicARNs) throws Exception{
 
-        String queueARN = getQueueArnByUrl(sqs, queueUrl);
+    public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
+                                                     final List<String> topicARNs ) throws Exception {
 
-        Statement statement = new Statement(Statement.Effect.Allow)
-            .withActions(SQSActions.SendMessage)
-            .withPrincipals(new Principal("*"))
-            .withResources(new Resource(queueARN));
+        String queueARN = getQueueArnByUrl( sqs, queueUrl );
 
-        List<Condition> conditions = new ArrayList<>();
+        Statement statement = new Statement( Statement.Effect.Allow ).withActions( SQSActions.SendMessage )
+                                                                     .withPrincipals( new Principal( "*" ) )
+                                                                     .withResources( new Resource( queueARN ) );
 
-        for(String topicARN : topicARNs){
+        List<Condition> conditions = new ArrayList<>();
 
-            conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+        for ( String topicARN : topicARNs ) {
 
+            conditions.add( ConditionFactory.newSourceArnCondition( topicARN ) );
         }
-        statement.setConditions(conditions);
+        statement.setConditions( conditions );
 
-        Policy policy = new Policy("SubscriptionPermission").withStatements(statement);
+        Policy policy = new Policy( "SubscriptionPermission" ).withStatements( statement );
 
 
         final Map<String, String> queueAttributes = new HashMap<>();
-        queueAttributes.put("Policy", policy.toJson());
+        queueAttributes.put( "Policy", policy.toJson() );
 
-        SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest(queueUrl, queueAttributes);
+        SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest( queueUrl, queueAttributes );
 
         try {
-            sqs.setQueueAttributes(queueAttributesRequest);
-        }catch (Exception e){
-            logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString(), e);
+            sqs.setQueueAttributes( queueAttributesRequest );
+        }
+        catch ( Exception e ) {
+            logger.error( "Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
+                topicARNs.toString(), e );
         }
-
-
     }
 
 
-    public static String getQueueArnByName(final AmazonSQSClient sqs,
-                                           final String queueName)
-            throws Exception {
+    public static String getQueueArnByName( final AmazonSQSClient sqs, final String queueName ) throws Exception {
 
         String queueUrl = null;
 
         try {
-            GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+            GetQueueUrlResult result = sqs.getQueueUrl( queueName );
             queueUrl = result.getQueueUrl();
-
-        } catch (QueueDoesNotExistException queueDoesNotExistException) {
+        }
+        catch ( QueueDoesNotExistException queueDoesNotExistException ) {
             //no op, swallow
-            logger.warn("Queue {} does not exist", queueName);
+            logger.warn( "Queue {} does not exist", queueName );
             return null;
-
-        } catch (Exception e) {
-            logger.error(String.format("Failed to get URL for Queue [%s] from SQS", queueName), e);
+        }
+        catch ( Exception e ) {
+            logger.error( String.format( "Failed to get URL for Queue [%s] from SQS", queueName ), e );
             throw e;
         }
 
-        if (queueUrl != null) {
+        if ( queueUrl != null ) {
 
             try {
-                GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
-                        .withAttributeNames("All");
+                GetQueueAttributesRequest queueAttributesRequest =
+                    new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" );
 
-                GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+                GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest );
                 Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
 
-                return sqsAttributeMap.get("QueueArn");
-
-            } catch (Exception e) {
-                logger.error("Failed to get queue URL from service", e);
+                return sqsAttributeMap.get( "QueueArn" );
+            }
+            catch ( Exception e ) {
+                logger.error( "Failed to get queue URL from service", e );
                 throw e;
             }
         }
@@ -143,75 +152,80 @@ public class AmazonNotificationUtils {
         return null;
     }
 
-    public static String getQueueArnByUrl(final AmazonSQSClient sqs,
-                                          final String queueUrl)
-            throws Exception {
+
+    public static String getQueueArnByUrl( final AmazonSQSClient sqs, final String queueUrl ) throws Exception {
 
         try {
-            GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
-                    .withAttributeNames("All");
+            GetQueueAttributesRequest queueAttributesRequest =
+                new GetQueueAttributesRequest( queueUrl ).withAttributeNames( "All" );
 
-            GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+            GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( queueAttributesRequest );
             Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
 
-            return sqsAttributeMap.get("QueueArn");
-
-        } catch (Exception e) {
-            logger.error("Failed to get queue URL from service", e);
+            return sqsAttributeMap.get( "QueueArn" );
+        }
+        catch ( Exception e ) {
+            logger.error( "Failed to get queue URL from service", e );
             throw e;
         }
     }
 
-    public static String getTopicArn(final AmazonSNSClient sns,
-                                     final String queueName,
-                                     final boolean createOnMissing)
-            throws Exception {
 
-        if (logger.isDebugEnabled())
-            logger.debug("Looking up Topic ARN: {}", queueName);
+    public static String getTopicArn( final AmazonSNSClient sns, final String queueName, final boolean createOnMissing )
+        throws Exception {
+
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Looking up Topic ARN: {}", queueName );
+        }
 
         ListTopicsResult listTopicsResult = sns.listTopics();
         String topicArn = null;
 
-        for (Topic topic : listTopicsResult.getTopics()) {
+        for ( Topic topic : listTopicsResult.getTopics() ) {
             String arn = topic.getTopicArn();
 
-            if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) {
+            if ( queueName.equals( arn.substring( arn.lastIndexOf( ':' ) ) ) ) {
                 topicArn = arn;
 
-                logger.info("Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName);
+                logger.info( "Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName );
             }
         }
 
-        if (topicArn == null && createOnMissing) {
-            logger.info("Creating topic for queue=[{}]...", queueName);
+        if ( topicArn == null && createOnMissing ) {
+            logger.info( "Creating topic for queue=[{}]...", queueName );
 
-            CreateTopicResult createTopicResult = sns.createTopic(queueName);
+            CreateTopicResult createTopicResult = sns.createTopic( queueName );
             topicArn = createTopicResult.getTopicArn();
 
-            logger.info("Successfully created topic with name {} and arn {}", queueName, topicArn);
-        } else {
-            logger.error("Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName, createOnMissing);
+            logger.info( "Successfully created topic with name {} and arn {}", queueName, topicArn );
+        }
+        else {
+            logger.error( "Error looking up topic ARN for queue=[{}] and createOnMissing=[{}]", queueName,
+                createOnMissing );
         }
 
-        if (logger.isDebugEnabled())
-            logger.debug("Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName );
+        }
 
 
         return topicArn;
     }
 
-    public static String getQueueUrlByName(final AmazonSQSClient sqs,
-                                           final String queueName) {
+
+    public static String getQueueUrlByName( final AmazonSQSClient sqs, final String queueName ) {
 
         try {
-            GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+            GetQueueUrlResult result = sqs.getQueueUrl( queueName );
             return result.getQueueUrl();
-        } catch (QueueDoesNotExistException e) {
-            logger.error("Queue {} does not exist", queueName);
-            throw e;
-        } catch (Exception e) {
-            logger.error("failed to get queue from service", e);
+        }
+        catch ( QueueDoesNotExistException e ) {
+            //no op, return null
+            logger.error( "Queue {} does not exist", queueName );
+            return null;
+        }
+        catch ( Exception e ) {
+            logger.error( "failed to get queue from service", e );
             throw e;
         }
     }


[12/13] incubator-usergrid git commit: Fixes incorrect ack

Posted by gr...@apache.org.
Fixes incorrect ack


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cce5fa5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cce5fa5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cce5fa5f

Branch: refs/heads/USERGRID-903
Commit: cce5fa5f84ffce5d45c0a93735ec8a75d0fd2b33
Parents: 0ca91b0
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Aug 5 14:31:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Aug 5 14:31:22 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java       | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cce5fa5f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f5ef2fc..b71a549 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -390,8 +390,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
         if (logger.isDebugEnabled())
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
 
-        ack( message );
-
         final EventBuilderImpl.EntityDeleteResults
             entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
 


[08/13] incubator-usergrid git commit: Merge remote-tracking branch 'origin/USERGRID-900' into two-dot-o-dev

Posted by gr...@apache.org.
Merge remote-tracking branch 'origin/USERGRID-900' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/70bdb8a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/70bdb8a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/70bdb8a5

Branch: refs/heads/USERGRID-903
Commit: 70bdb8a5c7476d41057fecb38575ca644b26b455
Parents: 6b90b66 94f0349
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Aug 5 10:24:40 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Aug 5 10:24:40 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 261 +++++-----
 .../asyncevents/AsyncIndexProvider.java         |  11 +-
 .../asyncevents/EventBuilder.java               |  11 +-
 .../asyncevents/EventBuilderImpl.java           |  21 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  14 +-
 .../asyncevents/model/AsyncEvent.java           | 153 +-----
 .../asyncevents/model/EdgeDeleteEvent.java      |  36 +-
 .../asyncevents/model/EdgeIndexEvent.java       |  39 +-
 .../asyncevents/model/EntityDeleteEvent.java    |  18 +-
 .../asyncevents/model/EntityIndexEvent.java     |  30 +-
 .../model/InitializeApplicationIndexEvent.java  |  21 +-
 .../index/AmazonAsyncEventServiceTest.java      |   8 +-
 .../index/AsyncIndexServiceTest.java            |  14 +-
 .../PerformanceEntityRebuildIndexTest.java      | 385 --------------
 .../usergrid/persistence/RebuildIndexTest.java  | 515 +++++++++++++++++++
 stack/corepersistence/pom.xml                   |   3 +-
 .../index/IndexLocationStrategy.java            |   4 +-
 stack/corepersistence/queue/pom.xml             |  20 +-
 .../queue/util/AmazonNotificationUtils.java     | 230 +++++----
 19 files changed, 968 insertions(+), 826 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/70bdb8a5/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------


[13/13] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-903

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-903

# By Todd Nine (7) and Michael Russo (1)
# Via Shawn Feldman (2) and others
* 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid:
  Fixes incorrect ack
  Fixes jackson issue with long fields and serialization that is broken in 2.6.0
  Added tests to check for resume functionality
  Fixes fat finger
  Upgrades Jackson to fix this issue.
  Fixes runtime bug with joda time conflict with Astayanx
  Invalidate the migration info cache after resetting the version.
  Updates amazon async service wiring to wire to event builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4d75b6b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4d75b6b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4d75b6b1

Branch: refs/heads/USERGRID-903
Commit: 4d75b6b108c6fc28e943983597043c8ba5ddb4a5
Parents: 0a78e15 cce5fa5
Author: GERey <gr...@apigee.com>
Authored: Wed Aug 5 15:30:16 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Wed Aug 5 15:30:16 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 259 +++++-----
 .../asyncevents/AsyncIndexProvider.java         |  11 +-
 .../asyncevents/EventBuilder.java               |  11 +-
 .../asyncevents/EventBuilderImpl.java           |  21 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  14 +-
 .../asyncevents/model/AsyncEvent.java           | 153 +-----
 .../asyncevents/model/EdgeDeleteEvent.java      |  36 +-
 .../asyncevents/model/EdgeIndexEvent.java       |  39 +-
 .../asyncevents/model/EntityDeleteEvent.java    |  18 +-
 .../asyncevents/model/EntityIndexEvent.java     |  30 +-
 .../model/InitializeApplicationIndexEvent.java  |  21 +-
 .../index/AmazonAsyncEventServiceTest.java      |   8 +-
 .../index/AsyncIndexServiceTest.java            |  14 +-
 .../PerformanceEntityRebuildIndexTest.java      | 385 --------------
 .../usergrid/persistence/RebuildIndexTest.java  | 515 +++++++++++++++++++
 .../data/DataMigrationManagerImpl.java          |   1 +
 stack/corepersistence/pom.xml                   |   1 -
 .../index/IndexLocationStrategy.java            |   4 +-
 stack/corepersistence/queue/pom.xml             |  20 +-
 .../queue/util/AmazonNotificationUtils.java     | 230 +++++----
 20 files changed, 966 insertions(+), 825 deletions(-)
----------------------------------------------------------------------



[11/13] incubator-usergrid git commit: Merge branch 'USERGRID-900' into two-dot-o-dev

Posted by gr...@apache.org.
Merge branch 'USERGRID-900' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0ca91b0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0ca91b0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0ca91b0e

Branch: refs/heads/USERGRID-903
Commit: 0ca91b0eb130663be70e43c20e61465aca9b68d0
Parents: dadc98a 3cb3f6c
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Aug 5 14:08:42 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Aug 5 14:08:42 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[09/13] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dadc98a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dadc98a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dadc98a5

Branch: refs/heads/USERGRID-903
Commit: dadc98a5df7a2cf2521e1bd85331ceef6fd93bf0
Parents: 70bdb8a 9427b4c
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Aug 5 10:24:55 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Aug 5 10:24:55 2015 -0600

----------------------------------------------------------------------
 .../persistence/core/migration/data/DataMigrationManagerImpl.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------



[05/13] incubator-usergrid git commit: Fixes fat finger

Posted by gr...@apache.org.
Fixes fat finger


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0f94609e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0f94609e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0f94609e

Branch: refs/heads/USERGRID-903
Commit: 0f94609eabc5e563428b51590da93c61b397dd3e
Parents: 491008e
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 17:31:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 17:31:16 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java              | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f94609e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 85aecdf..f5ef2fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -299,7 +299,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
 
-        subscibeAndAck( observable, message );
+        subscribeAndAck( observable, message );
     }
 
 
@@ -334,7 +334,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
             applicationScope, entity, edge ) );
 
-        subscibeAndAck( edgeIndexObservable, message );
+        subscribeAndAck( edgeIndexObservable, message );
     }
 
     @Override
@@ -363,7 +363,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final Observable<IndexOperationMessage> observable = eventBuilder.buildDeleteEdge( applicationScope, edge );
 
-        subscibeAndAck( observable, message );
+        subscribeAndAck( observable, message );
     }
 
 
@@ -399,7 +399,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final Observable merged = Observable.merge( entityDeleteResults.getEntitiesCompacted(),
             entityDeleteResults.getIndexObservable() );
 
-        subscibeAndAck( merged, message );
+        subscribeAndAck( merged, message );
     }
 
 
@@ -520,7 +520,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
      * @param observable
      * @param message
      */
-    private void subscibeAndAck( final Observable<?> observable, final QueueMessage message ){
+    private void subscribeAndAck( final Observable<?> observable, final QueueMessage message ){
        observable.doOnCompleted( ()-> ack(message)  ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() ).subscribe();
     }
 }


[07/13] incubator-usergrid git commit: Added tests to check for resume functionality

Posted by gr...@apache.org.
Added tests to check for resume functionality


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/94f0349c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/94f0349c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/94f0349c

Branch: refs/heads/USERGRID-903
Commit: 94f0349c8bc064a5b5aa8609a486668e519acc20
Parents: 0f94609
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Aug 5 10:21:03 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Aug 5 10:21:03 2015 -0600

----------------------------------------------------------------------
 .../PerformanceEntityRebuildIndexTest.java      | 385 --------------
 .../usergrid/persistence/RebuildIndexTest.java  | 515 +++++++++++++++++++
 2 files changed, 515 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94f0349c/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
deleted file mode 100644
index 9fcf6d7..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence;
-
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.lang.RandomStringUtils;
-
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-
-import com.codahale.metrics.MetricRegistry;
-import com.google.inject.Injector;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-
-//@RunWith(JukitoRunner.class)
-//@UseModules({ GuiceModule.class })
-
-
-public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
-    private static final Logger logger = LoggerFactory.getLogger( PerformanceEntityRebuildIndexTest.class );
-
-    private static final MetricRegistry registry = new MetricRegistry();
-
-
-    private static final int ENTITIES_TO_INDEX = 1000;
-
-
-    @Before
-    public void startReporting() {
-
-        logger.debug( "Starting metrics reporting" );
-    }
-
-
-    @After
-    public void printReport() {
-        logger.debug( "Printing metrics report" );
-    }
-
-
-    @Test( timeout = 120000 )
-    public void rebuildOneCollectionIndex() throws Exception {
-
-        logger.info( "Started rebuildIndex()" );
-
-        String rand = RandomStringUtils.randomAlphanumeric( 5 );
-        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
-
-        final EntityManager em = setup.getEmf().getEntityManager( appId );
-
-        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
-
-        // ----------------- create a bunch of entities
-
-        Map<String, Object> entityMap = new HashMap<String, Object>() {{
-            put( "key1", 1000 );
-            put( "key2", 2000 );
-            put( "key3", "Some value" );
-        }};
-
-
-        List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-        int herderCount = 0;
-        int shepardCount = 0;
-        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
-
-            final Entity entity;
-
-            try {
-                entityMap.put( "key", i );
-
-                if ( i % 2 == 0 ) {
-                    entity = em.create( "catherder", entityMap );
-                    herderCount++;
-                }
-                else {
-                    entity = em.create( "catshepard", entityMap );
-                    shepardCount++;
-                }
-            }
-            catch ( Exception ex ) {
-                throw new RuntimeException( "Error creating entity", ex );
-            }
-
-            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
-            if ( i % 10 == 0 ) {
-                logger.info( "Created {} entities", i );
-            }
-        }
-
-        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
-
-        // ----------------- test that we can read them, should work fine
-
-        logger.debug( "Read the data" );
-        readData( em, "catherders", herderCount, 0 );
-        readData( em, "catshepards", shepardCount, 0 );
-
-        // ----------------- delete the system and application indexes
-
-        logger.debug( "Deleting apps" );
-        deleteIndex( em.getApplicationId() );
-
-        // ----------------- test that we can read them, should fail
-
-        logger.debug( "Reading data, should fail this time " );
-
-        //should be no data
-        readData( em, "testTypes", 0, 0 );
-
-
-        //        ----------------- rebuild index for catherders only
-
-        logger.debug( "Preparing to rebuild all indexes" );
-
-
-        final ReIndexRequestBuilder builder =
-            reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
-
-        ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
-
-        assertNotNull( status.getJobId(), "JobId is present" );
-
-        logger.info( "Rebuilt index" );
-
-
-        waitForRebuild( status, reIndexService );
-
-
-        // ----------------- test that we can read the catherder collection and not the catshepard
-
-        readData( em, "catherders", herderCount, 0 );
-        readData( em, "catshepards", 0, 0 );
-    }
-
-
-    @Test( timeout = 120000 )
-    public void rebuildIndex() throws Exception {
-
-        logger.info( "Started rebuildIndex()" );
-
-        String rand = RandomStringUtils.randomAlphanumeric( 5 );
-        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
-
-        final EntityManager em = setup.getEmf().getEntityManager( appId );
-
-        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
-
-        // ----------------- create a bunch of entities
-
-        Map<String, Object> entityMap = new HashMap<String, Object>() {{
-            put( "key1", 1000 );
-            put( "key2", 2000 );
-            put( "key3", "Some value" );
-        }};
-        Map<String, Object> cat1map = new HashMap<String, Object>() {{
-            put( "name", "enzo" );
-            put( "color", "orange" );
-        }};
-        Map<String, Object> cat2map = new HashMap<String, Object>() {{
-            put( "name", "marquee" );
-            put( "color", "grey" );
-        }};
-        Map<String, Object> cat3map = new HashMap<String, Object>() {{
-            put( "name", "bertha" );
-            put( "color", "tabby" );
-        }};
-
-        Entity cat1 = em.create( "cat", cat1map );
-        Entity cat2 = em.create( "cat", cat2map );
-        Entity cat3 = em.create( "cat", cat3map );
-
-        List<EntityRef> entityRefs = new ArrayList<>();
-
-        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
-
-            final Entity entity;
-
-            try {
-                entityMap.put( "key", i );
-                entity = em.create( "testType", entityMap );
-
-
-                em.createConnection( entity, "herds", cat1 );
-                em.createConnection( entity, "herds", cat2 );
-                em.createConnection( entity, "herds", cat3 );
-            }
-            catch ( Exception ex ) {
-                throw new RuntimeException( "Error creating entity", ex );
-            }
-
-            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
-            if ( i % 10 == 0 ) {
-                logger.info( "Created {} entities", i );
-            }
-        }
-
-        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
-        app.refreshIndex();
-
-        // ----------------- test that we can read them, should work fine
-
-        logger.debug( "Read the data" );
-        final String collectionName = "testtypes";
-        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
-
-        // ----------------- delete the system and application indexes
-
-        logger.debug( "Deleting app index" );
-
-        deleteIndex( em.getApplicationId() );
-
-        // ----------------- test that we can read them, should fail
-
-        // deleting sytem app index will interfere with other concurrently running tests
-        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
-
-        // ----------------- test that we can read them, should fail
-
-        logger.debug( "Reading data, should fail this time " );
-
-        readData( em, collectionName, 0, 0 );
-
-
-
-        // ----------------- rebuild index
-
-        logger.debug( "Preparing to rebuild all indexes" );
-        ;
-
-
-        try {
-
-            final ReIndexRequestBuilder builder =
-                reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
-
-            ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
-
-            assertNotNull( status.getJobId(), "JobId is present" );
-
-            logger.info( "Rebuilt index" );
-
-
-            waitForRebuild( status, reIndexService );
-
-
-            logger.info( "Rebuilt index" );
-
-            app.refreshIndex();
-        }
-        catch ( Exception ex ) {
-            logger.error( "Error rebuilding index", ex );
-            fail();
-        }
-
-        // ----------------- test that we can read them
-
-        Thread.sleep( 2000 );
-        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
-    }
-
-
-    /**
-     * Wait for the rebuild to occur
-     */
-    private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
-        throws InterruptedException {
-        while ( true ) {
-
-            try {
-                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
-
-                if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
-                    break;
-                }
-            }
-            catch ( IllegalArgumentException iae ) {
-                //swallow.  Thrown if our job can't be found.  I.E hasn't updated yet
-            }
-
-
-            Thread.sleep( 1000 );
-        }
-    }
-
-
-    /**
-     * Delete app index
-     */
-    private void deleteIndex( UUID appUuid ) {
-
-        Injector injector = SpringResource.getInstance().getBean( Injector.class );
-        IndexLocationStrategyFactory indexLocationStrategyFactory = injector.getInstance(IndexLocationStrategyFactory.class);
-        EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
-
-        Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
-        ApplicationScope scope = new ApplicationScopeImpl( appId );
-        EntityIndex ei = eif.createEntityIndex(
-            indexLocationStrategyFactory.getIndexLocationStrategy(scope)
-        );
-
-        ei.deleteApplication().toBlocking().lastOrDefault( null );
-        app.refreshIndex();
-    }
-
-
-    private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
-        throws Exception {
-
-        app.refreshIndex();
-
-        Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
-        Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
-
-        int count = 0;
-        while ( true ) {
-
-            for ( Entity e : results.getEntities() ) {
-
-                assertEquals( 2000, e.getProperty( "key2" ) );
-
-                Results catResults =
-                    em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
-                assertEquals( expectedConnections, catResults.size() );
-
-                if ( count % 100 == 0 ) {
-                    logger.info( "read {} entities", count );
-                }
-                count++;
-            }
-
-            if ( results.hasCursor() ) {
-                logger.info( "Counted {} : query again with cursor", count );
-                q.setCursor( results.getCursor() );
-                results = em.searchCollection( em.getApplicationRef(), collectionName, q );
-            }
-            else {
-                break;
-            }
-        }
-
-        assertEquals("Did not get expected entities", expectedEntities, count);
-        return count;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/94f0349c/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
new file mode 100644
index 0000000..cea6ada
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/RebuildIndexTest.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.codahale.metrics.MetricRegistry;
+import com.google.inject.Injector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+//@RunWith(JukitoRunner.class)
+//@UseModules({ GuiceModule.class })
+
+
+public class RebuildIndexTest extends AbstractCoreIT {
+    private static final Logger logger = LoggerFactory.getLogger( RebuildIndexTest.class );
+
+    private static final MetricRegistry registry = new MetricRegistry();
+
+
+    private static final int ENTITIES_TO_INDEX = 1000;
+
+
+    @Before
+    public void startReporting() {
+
+        logger.debug( "Starting metrics reporting" );
+    }
+
+
+    @After
+    public void printReport() {
+        logger.debug( "Printing metrics report" );
+    }
+
+
+    @Test( timeout = 120000 )
+    public void rebuildOneCollectionIndex() throws Exception {
+
+        logger.info( "Started rebuildIndex()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
+
+        // ----------------- create a bunch of entities
+
+        Map<String, Object> entityMap = new HashMap<String, Object>() {{
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
+        }};
+
+
+        List<EntityRef> entityRefs = new ArrayList<EntityRef>();
+        int herderCount = 0;
+        int shepardCount = 0;
+        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
+
+            final Entity entity;
+
+            try {
+                entityMap.put( "key", i );
+
+                if ( i % 2 == 0 ) {
+                    entity = em.create( "catherder", entityMap );
+                    herderCount++;
+                }
+                else {
+                    entity = em.create( "catshepard", entityMap );
+                    shepardCount++;
+                }
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
+            }
+
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities", i );
+            }
+        }
+
+        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
+        app.refreshIndex();
+
+        // ----------------- test that we can read them, should work fine
+
+        logger.debug( "Read the data" );
+        readData( em, "catherders", herderCount, 0 );
+        readData( em, "catshepards", shepardCount, 0 );
+
+        // ----------------- delete the system and application indexes
+
+        logger.debug( "Deleting apps" );
+        deleteIndex( em.getApplicationId() );
+
+        // ----------------- test that we can read them, should fail
+
+        logger.debug( "Reading data, should fail this time " );
+
+        //should be no data
+        readData( em, "testTypes", 0, 0 );
+
+
+        //        ----------------- rebuild index for catherders only
+
+        logger.debug( "Preparing to rebuild all indexes" );
+
+
+        final ReIndexRequestBuilder builder =
+            reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
+
+        ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+
+        assertNotNull( status.getJobId(), "JobId is present" );
+
+        logger.info( "Rebuilt index" );
+
+
+        waitForRebuild( status, reIndexService );
+
+
+        // ----------------- test that we can read the catherder collection and not the catshepard
+
+        readData( em, "catherders", herderCount, 0 );
+        readData( em, "catshepards", 0, 0 );
+    }
+
+
+    @Test( timeout = 120000 )
+    public void rebuildIndex() throws Exception {
+
+        logger.info( "Started rebuildIndex()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
+
+        // ----------------- create a bunch of entities
+
+        Map<String, Object> entityMap = new HashMap<String, Object>() {{
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
+        }};
+        Map<String, Object> cat1map = new HashMap<String, Object>() {{
+            put( "name", "enzo" );
+            put( "color", "orange" );
+        }};
+        Map<String, Object> cat2map = new HashMap<String, Object>() {{
+            put( "name", "marquee" );
+            put( "color", "grey" );
+        }};
+        Map<String, Object> cat3map = new HashMap<String, Object>() {{
+            put( "name", "bertha" );
+            put( "color", "tabby" );
+        }};
+
+        Entity cat1 = em.create( "cat", cat1map );
+        Entity cat2 = em.create( "cat", cat2map );
+        Entity cat3 = em.create( "cat", cat3map );
+
+        List<EntityRef> entityRefs = new ArrayList<>();
+
+        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
+
+            final Entity entity;
+
+            try {
+                entityMap.put( "key", i );
+                entity = em.create( "testType", entityMap );
+
+
+                em.createConnection( entity, "herds", cat1 );
+                em.createConnection( entity, "herds", cat2 );
+                em.createConnection( entity, "herds", cat3 );
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
+            }
+
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities", i );
+            }
+        }
+
+        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
+        app.refreshIndex();
+
+        // ----------------- test that we can read them, should work fine
+
+        logger.debug( "Read the data" );
+        final String collectionName = "testtypes";
+        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
+
+        // ----------------- delete the system and application indexes
+
+        logger.debug( "Deleting app index" );
+
+        deleteIndex( em.getApplicationId() );
+
+        // ----------------- test that we can read them, should fail
+
+        // deleting sytem app index will interfere with other concurrently running tests
+        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
+
+        // ----------------- test that we can read them, should fail
+
+        logger.debug( "Reading data, should fail this time " );
+
+        readData( em, collectionName, 0, 0 );
+
+
+
+        // ----------------- rebuild index
+
+        logger.debug( "Preparing to rebuild all indexes" );
+        ;
+
+
+        try {
+
+            final ReIndexRequestBuilder builder =
+                reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
+
+            ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+
+            assertNotNull( status.getJobId(), "JobId is present" );
+
+            logger.info( "Rebuilt index" );
+
+
+            waitForRebuild( status, reIndexService );
+
+
+            logger.info( "Rebuilt index" );
+
+            app.refreshIndex();
+        }
+        catch ( Exception ex ) {
+            logger.error( "Error rebuilding index", ex );
+            fail();
+        }
+
+        // ----------------- test that we can read them
+
+        Thread.sleep( 2000 );
+        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
+    }
+
+
+
+    @Test( timeout = 120000 )
+    public void rebuildUpdatedSince() throws Exception {
+
+        logger.info( "Started rebuildIndex()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
+
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
+
+        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
+
+        // ----------------- create a bunch of entities
+
+
+        Map<String, Object> entityData = new HashMap<String, Object>() {{
+            put( "key1", 1000 );
+        }};
+
+
+        final Entity firstEntity = em.create( "thing", entityData );
+
+
+        final Entity secondEntity = em.create( "thing",  entityData);
+
+        app.refreshIndex();
+
+        // ----------------- test that we can read them, should work fine
+
+        logger.debug( "Read the data" );
+        final String collectionName = "things";
+
+        countEntities( em, collectionName, 2 );
+
+        // ----------------- delete the system and application indexes
+
+        logger.debug( "Deleting app index" );
+
+        deleteIndex( em.getApplicationId() );
+
+        // ----------------- test that we can read them, should fail
+
+        // deleting sytem app index will interfere with other concurrently running tests
+        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
+
+        // ----------------- test that we can read them, should fail
+
+        logger.debug( "Reading data, should fail this time " );
+
+        countEntities( em, collectionName, 0);
+
+
+
+        // ----------------- rebuild index
+
+        final long firstUpdatedTimestamp = firstEntity.getModified();
+        final long secondUpdatedTimestamp = secondEntity.getModified();
+
+        assertTrue( "second should be updated after second", firstUpdatedTimestamp < secondUpdatedTimestamp );
+
+
+        try {
+
+
+            final long updatedTimestamp = secondEntity.getModified();
+
+
+            logger.debug( "Preparing to rebuild all indexes with timestamp {}", updatedTimestamp );
+
+            //set our update timestamp
+            final ReIndexRequestBuilder builder =
+                reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withStartTimestamp(
+                    updatedTimestamp );
+
+            ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+
+            assertNotNull( status.getJobId(), "JobId is present" );
+
+            logger.info( "Rebuilt index" );
+
+            waitForRebuild( status, reIndexService );
+
+            logger.info( "Rebuilt index" );
+
+            app.refreshIndex();
+        }
+        catch ( Exception ex ) {
+            logger.error( "Error rebuilding index", ex );
+            fail();
+        }
+
+        // ----------------- test that we can read them
+
+        Thread.sleep( 2000 );
+        countEntities( em, collectionName, 1 );
+    }
+
+
+    /**
+     * Wait for the rebuild to occur
+     */
+    private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
+        throws InterruptedException {
+        while ( true ) {
+
+            try {
+                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
+
+                if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+                    break;
+                }
+            }
+            catch ( IllegalArgumentException iae ) {
+                //swallow.  Thrown if our job can't be found.  I.E hasn't updated yet
+            }
+
+
+            Thread.sleep( 1000 );
+        }
+    }
+
+
+    /**
+     * Delete app index
+     */
+    private void deleteIndex( UUID appUuid ) {
+
+        Injector injector = SpringResource.getInstance().getBean( Injector.class );
+        IndexLocationStrategyFactory indexLocationStrategyFactory = injector.getInstance(IndexLocationStrategyFactory.class);
+        EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
+
+        Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
+        ApplicationScope scope = new ApplicationScopeImpl( appId );
+        EntityIndex ei = eif.createEntityIndex(
+            indexLocationStrategyFactory.getIndexLocationStrategy(scope)
+        );
+
+        ei.deleteApplication().toBlocking().lastOrDefault( null );
+        app.refreshIndex();
+    }
+
+
+    private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
+        throws Exception {
+
+        app.refreshIndex();
+
+        Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+        Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
+
+        int count = 0;
+        while ( true ) {
+
+            for ( Entity e : results.getEntities() ) {
+
+                assertEquals( 2000, e.getProperty( "key2" ) );
+
+                Results catResults =
+                    em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
+                assertEquals( expectedConnections, catResults.size() );
+
+                if ( count % 100 == 0 ) {
+                    logger.info( "read {} entities", count );
+                }
+                count++;
+            }
+
+            if ( results.hasCursor() ) {
+                logger.info( "Counted {} : query again with cursor", count );
+                q.setCursor( results.getCursor() );
+                results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+            }
+            else {
+                break;
+            }
+        }
+
+        assertEquals( "Did not get expected entities", expectedEntities, count );
+        return count;
+    }
+
+    private int countEntities( EntityManager em, String collectionName, int expectedEntities)
+           throws Exception {
+
+           app.refreshIndex();
+
+           Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+           Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
+
+           int count = 0;
+           while ( true ) {
+
+               count += results.size();
+
+
+               if ( results.hasCursor() ) {
+                   logger.info( "Counted {} : query again with cursor", count );
+                   q.setCursor( results.getCursor() );
+                   results = em.searchCollection( em.getApplicationRef(), collectionName, q );
+               }
+               else {
+                   break;
+               }
+           }
+
+           assertEquals( "Did not get expected entities", expectedEntities, count );
+           return count;
+       }
+
+
+}


[06/13] incubator-usergrid git commit: Merge commit '0f94609eabc5e563428b51590da93c61b397dd3e' into two-dot-o-dev

Posted by gr...@apache.org.
Merge commit '0f94609eabc5e563428b51590da93c61b397dd3e' into two-dot-o-dev

* commit '0f94609eabc5e563428b51590da93c61b397dd3e':
  Fixes fat finger
  Upgrades Jackson to fix this issue.
  Fixes runtime bug with joda time conflict with Astayanx
  Updates amazon async service wiring to wire to event builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9427b4ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9427b4ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9427b4ce

Branch: refs/heads/USERGRID-903
Commit: 9427b4ce47f389e517231b97c09c588f0cbf18f4
Parents: ba5c491 0f94609
Author: GERey <gr...@apigee.com>
Authored: Tue Aug 4 16:34:15 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Tue Aug 4 16:34:15 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 261 ++++++++++---------
 .../asyncevents/AsyncIndexProvider.java         |  11 +-
 .../asyncevents/EventBuilder.java               |  11 +-
 .../asyncevents/EventBuilderImpl.java           |  21 +-
 .../asyncevents/InMemoryAsyncEventService.java  |  14 +-
 .../asyncevents/model/AsyncEvent.java           | 153 ++---------
 .../asyncevents/model/EdgeDeleteEvent.java      |  36 ++-
 .../asyncevents/model/EdgeIndexEvent.java       |  39 ++-
 .../asyncevents/model/EntityDeleteEvent.java    |  18 +-
 .../asyncevents/model/EntityIndexEvent.java     |  30 ++-
 .../model/InitializeApplicationIndexEvent.java  |  21 +-
 .../index/AmazonAsyncEventServiceTest.java      |   8 +-
 .../index/AsyncIndexServiceTest.java            |  14 +-
 stack/corepersistence/pom.xml                   |   3 +-
 .../index/IndexLocationStrategy.java            |   4 +-
 stack/corepersistence/queue/pom.xml             |  20 +-
 .../queue/util/AmazonNotificationUtils.java     | 230 ++++++++--------
 17 files changed, 453 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9427b4ce/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------


[04/13] incubator-usergrid git commit: Upgrades Jackson to fix this issue.

Posted by gr...@apache.org.
Upgrades Jackson to fix this issue.

https://github.com/FasterXML/jackson-databind/issues/656

Upgrades events to be polymorphic to allow for easy addition and mapping of events without 1 large class for all events


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/491008e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/491008e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/491008e9

Branch: refs/heads/USERGRID-903
Commit: 491008e961ac893f734d12352af2ac040b1ea1fe
Parents: 29d115f
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 4 17:02:53 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 4 17:02:53 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 145 ++++++++++--------
 .../asyncevents/EventBuilderImpl.java           |   9 +-
 .../asyncevents/model/AsyncEvent.java           | 153 +++----------------
 .../asyncevents/model/EdgeDeleteEvent.java      |  36 ++++-
 .../asyncevents/model/EdgeIndexEvent.java       |  39 ++++-
 .../asyncevents/model/EntityDeleteEvent.java    |  18 ++-
 .../asyncevents/model/EntityIndexEvent.java     |  18 ++-
 .../model/InitializeApplicationIndexEvent.java  |  21 ++-
 .../index/AmazonAsyncEventServiceTest.java      |   3 -
 .../index/AsyncIndexServiceTest.java            |  14 +-
 stack/corepersistence/pom.xml                   |   3 +-
 .../index/IndexLocationStrategy.java            |   4 +-
 12 files changed, 222 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 0429af3..85aecdf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,20 +25,20 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
-import com.codahale.metrics.Histogram;
-import com.google.common.base.Preconditions;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.*;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -46,6 +46,10 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -56,7 +60,9 @@ import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -211,43 +217,42 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     }
 
-    private void handleMessages(final List<QueueMessage> messages) {
-        if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size());
 
-        for (QueueMessage message : messages) {
-            final AsyncEvent event = (AsyncEvent) message.getBody();
-
-            if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType());
-
-            if (event == null || event.getEventType() == null) {
-                logger.error("AsyncEvent type or event is null!");
-            } else {
-                switch (event.getEventType()) {
+    private void handleMessages( final List<QueueMessage> messages ) {
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "handleMessages with {} message", messages.size() );
+        }
 
-                    case EDGE_DELETE:
-                        handleEdgeDelete(message);
-                        break;
+        for ( QueueMessage message : messages ) {
+            final AsyncEvent event = ( AsyncEvent ) message.getBody();
 
-                    case EDGE_INDEX:
-                        handleEdgeIndex(message);
-                        break;
+            logger.debug( "Processing {} event", event );
 
-                    case ENTITY_DELETE:
-                        handleEntityDelete(message);
-                        break;
+            if ( event == null ) {
+                logger.error( "AsyncEvent type or event is null!" );
+                continue;
+            }
 
-                    case ENTITY_INDEX:
-                        handleEntityIndexUpdate(message);
-                        break;
 
-                    case APPLICATION_INDEX:
-                        handleInitializeApplicationIndex(message);
-                        break;
+            if ( event instanceof EdgeDeleteEvent ) {
+                handleEdgeDelete( message );
+            }
+            else if ( event instanceof EdgeIndexEvent ) {
+                handleEdgeIndex( message );
+            }
 
-                    default:
-                        logger.error("Unknown EventType: {}", event.getEventType());
+            else if ( event instanceof EntityDeleteEvent ) {
+                handleEntityDelete( message );
+            }
+            else if ( event instanceof EntityIndexEvent ) {
+                handleEntityIndexUpdate( message );
+            }
 
-                }
+            else if ( event instanceof InitializeApplicationIndexEvent ) {
+                handleInitializeApplicationIndex( message );
+            }
+            else {
+                logger.error( "Unknown EventType: {}", event );
             }
 
             messageCycle.update( System.currentTimeMillis() - event.getCreationTime() );
@@ -257,7 +262,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
-        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
+            applicationScope );
         offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
     }
 
@@ -272,19 +278,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     public void handleEntityIndexUpdate(final QueueMessage message) {
 
-        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
+        Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
 
-        final EntityIndexEvent event = (EntityIndexEvent) message.getBody();
+        final AsyncEvent event = ( AsyncEvent ) message.getBody();
 
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
-        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
+        Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
+
+        final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
+
 
         //process the entity immediately
         //only process the same version, otherwise ignore
-        final EntityIdScope entityIdScope = event.getEntityIdScope();
+        final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
         final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
         final Id entityId = entityIdScope.getId();
-        final long updatedAfter = event.getUpdatedAfter();
+        final long updatedAfter = entityIndexEvent.getUpdatedAfter();
 
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
 
@@ -310,17 +319,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
 
-        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex");
-        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType()));
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
+        Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
+
+        final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
 
-        final ApplicationScope applicationScope = event.getApplicationScope();
-        final Edge edge = event.getEdge();
+        final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
+        final Edge edge = edgeIndexEvent.getEdge();
 
 
 
         final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
 
-        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
+        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge(
             applicationScope, entity, edge ) );
 
         subscibeAndAck( edgeIndexObservable, message );
@@ -339,11 +350,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
 
-        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete");
-        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType()));
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
+        Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
 
-        final ApplicationScope applicationScope = event.getApplicationScope();
-        final Edge edge = event.getEdge();
+
+        final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
+
+        final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
+        final Edge edge = edgeIndexEvent.getEdge();
 
         if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
 
@@ -364,12 +378,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
-        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
-        Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE,
-            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) );
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
+        Preconditions.checkArgument( event instanceof EntityDeleteEvent,
+            String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
 
-        final ApplicationScope applicationScope = event.getApplicationScope();
-        final Id entityId = event.getEntityId();
+
+        final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
+        final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
+        final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
 
         if (logger.isDebugEnabled())
             logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
@@ -391,10 +407,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
 
         final AsyncEvent event = (AsyncEvent) message.getBody();
-        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
-        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
+        Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" );
+        Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
+
+        final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
+            ( InitializeApplicationIndexEvent ) event;
 
-        final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
+        final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
         final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
         index.initialize();
         ack( message );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 46cec2e..4bf5695 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -174,8 +174,15 @@ public class EventBuilderImpl implements EventBuilder {
             entity -> {
                 final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
 
+                /**
+                 * We don't have a modified field, so we can't check, pass it through
+                 */
+                if ( modified == null ) {
+                    return true;
+                }
+
                 //only re-index if it has been updated and been updated after our timestamp
-                return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+                return  modified.getValue() >= entityIndexOperation.getUpdatedSince();
             } )
             //perform indexing on the task scheduler and start it
             .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 3d22986..6b45297 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -19,155 +19,42 @@
 
 package org.apache.usergrid.corepersistence.asyncevents.model;
 
+
+import java.io.Serializable;
+
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.model.entity.Id;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
-import java.io.Serializable;
 
 /**
- * Created by Jeff West on 5/25/15.
+ * Marker class for serialization
  */
 
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AsyncEvent implements Serializable {
-
-    @JsonProperty
-    protected IndexLocationStrategy indexLocationStrategy;
-
-    @JsonProperty
-    protected EventType eventType;
-
-    @JsonProperty
-    protected EntityIdScope entityIdScope;
-
-    @JsonProperty
-    protected ApplicationScope applicationScope;
-
-    @JsonProperty
-    protected Id entityId;
+@JsonIgnoreProperties( ignoreUnknown = true )
+@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
+@JsonSubTypes( {
+    @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ),
+    @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
+    @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
+    @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
+    @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+} )
 
-    @JsonProperty
-    protected Edge edge;
+public abstract class AsyncEvent implements Serializable {
 
     @JsonProperty
     protected long creationTime;
 
-    /**
-     * required for jackson, do not delete
-     */
 
+    //set by default, will be overridden when de-serializing
     protected AsyncEvent() {
+        creationTime = System.currentTimeMillis();
     }
 
-    public AsyncEvent(final EventType eventType) {
-
-        this.eventType = eventType;
-        this.creationTime = System.currentTimeMillis();
-    }
-
-    public AsyncEvent(final EventType eventType,
-                      final EntityIdScope entityIdScope) {
-
-        this.eventType = eventType;
-        this.entityIdScope = entityIdScope;
-        this.creationTime = System.currentTimeMillis();
-    }
-
-    public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) {
-        this.eventType = eventType;
-        this.indexLocationStrategy = indexLocationStrategy;
-        this.creationTime = System.currentTimeMillis();
-    }
-
-    public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) {
-        this.eventType = eventType;
-        this.applicationScope = applicationScope;
-        this.edge = edge;
-        this.creationTime = System.currentTimeMillis();
-    }
-
-    public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Id entityId, Edge edge) {
-        this.eventType = eventType;
-        this.applicationScope = applicationScope;
-        this.edge = edge;
-        this.entityId = entityId;
-        this.creationTime = System.currentTimeMillis();
-    }
-
-    @JsonSerialize()
-    public final Id getEntityId() {
-        return entityId;
-    }
-
-    protected void setEntityId(Id entityId) {
-        this.entityId = entityId;
-    }
-
-    @JsonSerialize()
-    public final EventType getEventType() {
-        return eventType;
-    }
-
-    protected void setEventType(EventType eventType) {
-        this.eventType = eventType;
-    }
-
-    @JsonSerialize()
-    public EntityIdScope getEntityIdScope() {
-        return entityIdScope;
-    }
-
-    protected void setEntityIdScope(EntityIdScope entityIdScope) {
-        this.entityIdScope = entityIdScope;
-    }
-
-    @JsonSerialize()
-    public ApplicationScope getApplicationScope() {
-        return applicationScope;
-    }
-
-    protected void setApplicationScope(ApplicationScope applicationScope) {
-        this.applicationScope = applicationScope;
-    }
-
-    @JsonSerialize()
-    @JsonDeserialize(as=ReplicatedIndexLocationStrategy.class)
-    public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
-
-    protected void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
-        this.indexLocationStrategy = indexLocationStrategy;
-    }
-
-    @JsonSerialize()
-    public Edge getEdge() {
-        return edge;
-    }
-
-    @JsonSerialize()
-    public long getCreationTime() {  return creationTime; }
-
-    protected void setEdge(Edge edge) {
-        this.edge = edge;
-    }
-
-    public enum EventType {
-        EDGE_DELETE,
-        EDGE_INDEX,
-        ENTITY_DELETE,
-        ENTITY_INDEX,
-        APPLICATION_INDEX;
-
 
-        public String asString() {
-            return toString();
-        }
+    public long getCreationTime() {
+        return creationTime;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
index 3af9818..af16bac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java
@@ -19,19 +19,41 @@
 
 package org.apache.usergrid.corepersistence.asyncevents.model;
 
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+
 public final class EdgeDeleteEvent extends AsyncEvent {
+
+    @JsonProperty
+    protected ApplicationScope applicationScope;
+
+
+    @JsonProperty
+    protected Edge edge;
+
+
     public EdgeDeleteEvent() {
     }
 
-    public EdgeDeleteEvent(ApplicationScope applicationScope, Edge edge) {
-        super(EventType.EDGE_DELETE, applicationScope, edge);
+
+    public EdgeDeleteEvent( ApplicationScope applicationScope, Edge edge ) {
+        this.applicationScope = applicationScope;
+        this.edge = edge;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+    public Edge getEdge() {
+        return edge;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
index cd0118f..c89b828 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.asyncevents.model;
 
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -28,13 +30,19 @@ import org.apache.usergrid.persistence.model.entity.Id;
 
 import java.io.Serializable;
 
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+
 public final class EdgeIndexEvent
-    extends AsyncEvent
-    implements Serializable {
+    extends AsyncEvent {
+
+
+    @JsonProperty
+    protected ApplicationScope applicationScope;
+
+    @JsonProperty
+    protected Id entityId;
+
+    @JsonProperty
+    protected Edge edge;
 
     /**
      * Needed by jackson
@@ -43,6 +51,23 @@ public final class EdgeIndexEvent
     }
 
     public EdgeIndexEvent(ApplicationScope applicationScope, Id entityId, Edge edge) {
-        super(EventType.EDGE_INDEX, applicationScope, entityId, edge);
+        this.applicationScope = applicationScope;
+        this.entityId = entityId;
+        this.edge = edge;
+    }
+
+
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+
+    public Edge getEdge() {
+        return edge;
+    }
+
+
+    public Id getEntityId() {
+        return entityId;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
index 606deae..847a07d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java
@@ -19,20 +19,28 @@
 
 package org.apache.usergrid.corepersistence.asyncevents.model;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
 public final class EntityDeleteEvent extends AsyncEvent {
+
+
+    @JsonProperty
+    protected EntityIdScope entityIdScope;
+
     public EntityDeleteEvent() {
     }
 
     public EntityDeleteEvent(EntityIdScope entityIdScope) {
-        super(EventType.ENTITY_DELETE, entityIdScope);
+        this.entityIdScope =  entityIdScope;
+    }
+
+
+    public EntityIdScope getEntityIdScope() {
+        return entityIdScope;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
index 81961a0..a04326a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java
@@ -19,22 +19,26 @@
 
 package org.apache.usergrid.corepersistence.asyncevents.model;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 
-/**
- * Created by Jeff West on 5/25/15.
- */
-@JsonDeserialize(as = AsyncEvent.class)
+
 public final class EntityIndexEvent extends AsyncEvent {
 
+
+    @JsonProperty
+    protected EntityIdScope entityIdScope;
+
+    @JsonProperty
     private long updatedAfter;
 
     public EntityIndexEvent() {
     }
 
     public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) {
-        super(EventType.ENTITY_INDEX, entityIdScope);
+        this.entityIdScope = entityIdScope;
         this.updatedAfter = updatedAfter;
     }
 
@@ -44,7 +48,7 @@ public final class EntityIndexEvent extends AsyncEvent {
     }
 
 
-    public void setUpdatedAfter( long updatedAfter ) {
-        this.updatedAfter = updatedAfter;
+    public EntityIdScope getEntityIdScope() {
+        return entityIdScope;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 8b20651..68f0113 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -19,20 +19,31 @@
  */
 package org.apache.usergrid.corepersistence.asyncevents.model;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 
 /**
  * event to init app index
  */
-@JsonDeserialize(as = AsyncEvent.class)
+
 public class InitializeApplicationIndexEvent extends AsyncEvent {
-    public InitializeApplicationIndexEvent() {
-        super(EventType.APPLICATION_INDEX);
-    }
+
+
+    @JsonProperty
+    protected IndexLocationStrategy indexLocationStrategy;
+
     public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) {
-        super(EventType.APPLICATION_INDEX, indexLocationStrategy);
+        this.indexLocationStrategy = indexLocationStrategy;
+
+    }
+
 
+    public IndexLocationStrategy getIndexLocationStrategy() {
+        return indexLocationStrategy;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index d37701b..4660389 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -67,9 +67,6 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     public MetricsFactory metricsFactory;
 
     @Inject
-    public IndexService indexService;
-
-    @Inject
     public RxTaskScheduler rxTaskScheduler;
 
     @Inject

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index 9b104fc..d34a1a9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -131,7 +131,7 @@ public abstract class AsyncIndexServiceTest {
 
 
         /**
-         * Write 10k edges 10 at a time in parallel
+         * Write 500 edges
          */
 
 
@@ -139,19 +139,19 @@ public abstract class AsyncIndexServiceTest {
             final Id connectingId = createId("connecting");
             final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId());
 
-            return graphManager.writeEdge(edge).subscribeOn(Schedulers.io());
+            return graphManager.writeEdge( edge );
         }).toList().toBlocking().last();
 
 
+        //queue up processing
         asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity );
 
-        emf.refreshIndex(applicationScope.getApplication().getUuid());
-
-        //        Thread.sleep( 1000000000000l );
 
         final EntityIndex EntityIndex =
             entityIndexFactory.createEntityIndex( indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
 
+        emf.refreshIndex(applicationScope.getApplication().getUuid());
+
         final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
 
         //query until it's available
@@ -176,13 +176,13 @@ public abstract class AsyncIndexServiceTest {
     }
 
 
-    private CandidateResults getResults( final EntityIndex EntityIndex,
+    private CandidateResults getResults( final EntityIndex entityIndex,
                                          final SearchEdge searchEdge, final SearchTypes searchTypes, final int expectedSize, final int attempts ) {
 
 
         for ( int i = 0; i < attempts; i++ ) {
             final CandidateResults candidateResults =
-                EntityIndex.search( searchEdge, searchTypes, "select *", 100, 0 );
+                entityIndex.search( searchEdge, searchTypes, "select *", 100, 0 );
 
             if ( candidateResults.size() == expectedSize ) {
                 return candidateResults;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index b95f3e1..1ee5272 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -65,8 +65,7 @@ limitations under the License.
         <guice.version>4.0-beta5</guice.version>
         <guicyfig.version>3.2</guicyfig.version>
         <hystrix.version>1.4.0</hystrix.version>
-        <jackson-2-version>2.4.1</jackson-2-version>
-        <jackson-smile.verson>2.4.3</jackson-smile.verson>
+        <jackson-2-version>2.6.0</jackson-2-version>
         <mockito.version>1.10.8</mockito.version>
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/491008e9/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
index 6b07b9e..e5c8f8f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
@@ -19,6 +19,8 @@
  */
 package org.apache.usergrid.persistence.index;
 
+import java.io.Serializable;
+
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 /**
  * location strategy for index
@@ -33,7 +35,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
  applicationIndexName = {indexRoot}_applications_{bucketId}
  applicationAliasName = {indexRoot}_{appId}_read_alias || {indexRoot}_{appId}_write_alias
  */
-public interface IndexLocationStrategy {
+public interface IndexLocationStrategy extends Serializable {
     /**
      * get the alias name
      * @return


[10/13] incubator-usergrid git commit: Fixes jackson issue with long fields and serialization that is broken in 2.6.0

Posted by gr...@apache.org.
Fixes jackson issue with long fields and serialization that is broken in 2.6.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3cb3f6c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3cb3f6c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3cb3f6c0

Branch: refs/heads/USERGRID-903
Commit: 3cb3f6c08257277f5fec48eb43dd42156980e41d
Parents: 94f0349
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Aug 5 12:07:43 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Aug 5 12:07:43 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3cb3f6c0/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 1ee5272..4b47bc0 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -65,7 +65,7 @@ limitations under the License.
         <guice.version>4.0-beta5</guice.version>
         <guicyfig.version>3.2</guicyfig.version>
         <hystrix.version>1.4.0</hystrix.version>
-        <jackson-2-version>2.6.0</jackson-2-version>
+        <jackson-2-version>2.4.1</jackson-2-version>
         <mockito.version>1.10.8</mockito.version>
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>