You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/23 22:16:24 UTC

[01/18] usergrid git commit: Adds strong consistency read to maps. Persists ES batches into Cassandra for multi region execution.

Repository: usergrid
Updated Branches:
  refs/heads/USERGRID-1052 48161a1ac -> 4e51d3839


Adds strong consistency read to maps.  Persists ES batches into Cassandra for multi region execution.

A bug in wiring JSON to SQS still exists, it's incorrectly escaping some message subtypes.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/94a90781
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/94a90781
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/94a90781

Branch: refs/heads/USERGRID-1052
Commit: 94a9078125fc32d755e33e562f8e8fd8624641c1
Parents: 2b22c61
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 16 18:02:44 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 16 18:02:44 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 186 +++++++++++---
 .../asyncevents/AsyncEventService.java          |   1 +
 .../asyncevents/AsyncIndexProvider.java         |  22 +-
 .../asyncevents/model/AsyncEvent.java           |   3 +-
 .../model/ElasticsearchIndexEvent.java          |  50 ++++
 .../index/IndexProcessorFig.java                |   8 +
 .../util/ObjectJsonSerializer.java              |  74 ++++++
 .../index/AmazonAsyncEventServiceTest.java      |   6 +-
 .../index/AsyncIndexServiceTest.java            |   2 +-
 .../usergrid/persistence/map/MapManager.java    |  25 +-
 .../persistence/map/impl/MapManagerImpl.java    |   6 +
 .../persistence/map/impl/MapSerialization.java  |  27 +-
 .../map/impl/MapSerializationImpl.java          | 248 ++++++++++---------
 .../index/impl/DeIndexOperation.java            |   4 +
 .../persistence/index/impl/IndexOperation.java  |   4 +
 .../index/impl/IndexOperationMessage.java       |   5 +
 .../persistence/queue/DefaultQueueManager.java  |  12 +-
 .../persistence/queue/QueueManager.java         |   8 +-
 .../queue/impl/SNSQueueManagerImpl.java         | 188 ++++++++++----
 .../queue/impl/SQSQueueManagerImpl.java         |  28 ++-
 .../services/queues/ImportQueueManager.java     |   9 +-
 21 files changed, 666 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 95126c6..c9f0953 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -21,13 +21,20 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.base.Optional;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.exception.NotImplementedException;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +61,13 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;
@@ -82,12 +94,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
 
+    private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer(  );
+
     // SQS maximum receive messages is 10
     private static final int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
 
     private final QueueManager queue;
-    private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
     private final IndexProducer indexProducer;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -109,6 +122,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final AtomicLong counter = new AtomicLong();
     private final AtomicLong inFlight = new AtomicLong();
     private final Histogram messageCycle;
+    private final MapManager esMapPersistence;
 
     //the actively running subscription
     private List<Subscription> subscriptions = new ArrayList<>();
@@ -123,6 +137,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                     final IndexLocationStrategyFactory indexLocationStrategyFactory,
                                     final EntityIndexFactory entityIndexFactory,
                                     final EventBuilder eventBuilder,
+                                    final MapManagerFactory mapManagerFactory,
                                     final RxTaskScheduler rxTaskScheduler ) {
         this.indexProducer = indexProducer;
 
@@ -130,10 +145,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.eventBuilder = eventBuilder;
+
+        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
+
+        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
         this.rxTaskScheduler = rxTaskScheduler;
 
-        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
+
         this.indexProcessorFig = indexProcessorFig;
 
         this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
@@ -158,7 +179,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     /**
      * Offer the EntityIdScope to SQS
      */
-    private void offer(final Object operation) {
+    private void offer(final Serializable operation) {
         final Timer.Context timer = this.writeTimer.time();
 
         try {
@@ -213,7 +234,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final Timer.Context timer = this.ackTimer.time();
 
         try{
-            queue.commitMessage(message);
+            queue.commitMessage( message );
 
             //decrement our in-flight counter
             inFlight.decrementAndGet();
@@ -235,7 +256,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final Timer.Context timer = this.ackTimer.time();
 
         try{
-            queue.commitMessages(messages);
+            queue.commitMessages( messages );
 
             //decrement our in-flight counter
             inFlight.decrementAndGet();
@@ -296,7 +317,13 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         handleInitializeApplicationIndex(event, message);
                         indexoperationObservable = Observable.just(new IndexOperationMessage());
                         validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                    } else {
+                    } else if (event instanceof ElasticsearchIndexEvent){
+                        handleIndexOperation( (ElasticsearchIndexEvent)event );
+                        indexoperationObservable = Observable.just( new IndexOperationMessage() );
+                        validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
+                    }
+
+                    else {
                         throw new Exception("Unknown EventType");//TODO: print json instead
                     }
 
@@ -434,6 +461,85 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
     }
 
+
+    /**
+     * Queue up an indexOperationMessage for multi region execution
+     * @param indexOperationMessage
+     */
+    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+        final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+
+        final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+        //write to the map in ES
+        esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+
+
+
+        //now queue up the index message
+
+        final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+
+        //send to the topic so all regions index the batch
+        try {
+            queue.sendMessageToTopic( elasticsearchIndexEvent );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to pulish to topic", e );
+        }
+    }
+
+    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+        Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+        //load the entity
+
+        final String message = esMapPersistence.getString( messageId.toString() );
+
+        String highConsistency = null;
+
+        if(message == null){
+            logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+
+            highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
+
+        }
+
+        //read the value from the string
+
+        final IndexOperationMessage indexOperationMessage;
+
+        //our original local read has it, parse it.
+        if(message != null){
+             indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+        }
+        //we tried to read it at a higher consistency level and it works
+        else if (highConsistency != null){
+            indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+        }
+
+        //we couldn't find it, bail
+        else{
+            logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+
+            throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+        }
+
+
+
+        //now execute it
+        indexProducer.put(indexOperationMessage).toBlocking().last();
+
+    }
+
+
+
     @Override
     public long getQueueDepth() {
         return queue.getQueueDepth();
@@ -510,71 +616,75 @@ public class AmazonAsyncEventService implements AsyncEventService {
         synchronized (mutex) {
 
             Observable<List<QueueMessage>> consumer =
-                    Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
+                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
                         @Override
-                        public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
+                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
-                            Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());
+                            Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
                             List<QueueMessage> drainList = null;
 
                             do {
                                 try {
-                                    drainList = take().toList().toBlocking().lastOrDefault(null);
+                                    drainList = take().toList().toBlocking().lastOrDefault( null );
                                     //emit our list in it's entity to hand off to a worker pool
-                                    subscriber.onNext(drainList);
+                                    subscriber.onNext( drainList );
 
                                     //take since  we're in flight
-                                    inFlight.addAndGet(drainList.size());
-                                } catch (Throwable t) {
+                                    inFlight.addAndGet( drainList.size() );
+                                }
+                                catch ( Throwable t ) {
                                     final long sleepTime = indexProcessorFig.getFailureRetryTime();
 
-                                    logger.error("Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t);
+                                    logger.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
 
-                                    if (drainList != null) {
-                                        inFlight.addAndGet(-1 * drainList.size());
+                                    if ( drainList != null ) {
+                                        inFlight.addAndGet( -1 * drainList.size() );
                                     }
 
 
                                     try {
-                                        Thread.sleep(sleepTime);
-                                    } catch (InterruptedException ie) {
+                                        Thread.sleep( sleepTime );
+                                    }
+                                    catch ( InterruptedException ie ) {
                                         //swallow
                                     }
 
                                     indexErrorCounter.inc();
                                 }
                             }
-                            while (true);
+                            while ( true );
                         }
-                    })
+                    } )
                             //this won't block our read loop, just reads and proceeds
-                            .map(messages ->
-                            {
-                                if (messages == null || messages.size() == 0) {
+                            .map( messages -> {
+                                if ( messages == null || messages.size() == 0 ) {
                                     return null;
                                 }
 
                                 try {
-                                    List<IndexEventResult> indexEventResults = callEventHandlers(messages);
-                                    List<QueueMessage> messagesToAck = submitToIndex(indexEventResults);
-                                    if (messagesToAck == null || messagesToAck.size() == 0) {
-                                        logger.error("No messages came back from the queue operation should have seen "+messages.size(),messages);
+                                    List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+                                    List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
+                                    if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+                                        logger.error( "No messages came back from the queue operation should have seen "
+                                            + messages.size(), messages );
                                         return messagesToAck;
                                     }
-                                    if(messagesToAck.size()<messages.size()){
-                                        logger.error("Missing messages from queue post operation",messages,messagesToAck);
+                                    if ( messagesToAck.size() < messages.size() ) {
+                                        logger.error( "Missing messages from queue post operation", messages,
+                                            messagesToAck );
                                     }
                                     //ack each message, but only if we didn't error.
-                                    ack(messagesToAck);
+                                    ack( messagesToAck );
                                     return messagesToAck;
-                                } catch (Exception e) {
-                                    logger.error("failed to ack messages to sqs", e);
+                                }
+                                catch ( Exception e ) {
+                                    logger.error( "failed to ack messages to sqs", e );
                                     return null;
                                     //do not rethrow so we can process all of them
                                 }
-                            });
+                            } );
 
             //start in the background
 
@@ -619,12 +729,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         //send the batch
         //TODO: should retry?
-        try {
-            indexProducer.put(combined).toBlocking().lastOrDefault(null);
-        }catch (Exception e){
-            logger.error("Failed to submit to index producer",e);
-            throw e;
-        }
+        queueIndexOperationMessage( combined );
+
         return messagesToAck;
     }
 
@@ -671,4 +777,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
             return creationTime;
         }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index ae5688c..dcfffcb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index e9e36f0..3865ecb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -51,20 +52,18 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
     private final EntityIndexFactory entityIndexFactory;
     private final IndexProducer indexProducer;
+    private final MapManagerFactory mapManagerFactory;
 
     private AsyncEventService asyncEventService;
 
 
     @Inject
-    public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
-                              final QueueManagerFactory queueManagerFactory,
-                              final MetricsFactory metricsFactory,
-                              final RxTaskScheduler rxTaskScheduler,
-                              final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                              final EventBuilder eventBuilder,
-                              final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                              final EntityIndexFactory entityIndexFactory,
-                              final IndexProducer indexProducer) {
+    public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
+                               final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
+                                   EntityCollectionManagerFactory entityCollectionManagerFactory,
+                               final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                               final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
+                               final MapManagerFactory mapManagerFactory ) {
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
@@ -75,6 +74,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         this.indexLocationStrategyFactory = indexLocationStrategyFactory;
         this.entityIndexFactory = entityIndexFactory;
         this.indexProducer = indexProducer;
+        this.mapManagerFactory = mapManagerFactory;
     }
 
 
@@ -99,10 +99,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
             case SNS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 6b45297..1af54e3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -39,7 +39,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
     @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ),
     @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ),
     @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ),
-    @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" )
+    @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ),
+    @JsonSubTypes.Type( value = ElasticsearchIndexEvent.class, name = "elasticsearchIndexEvent" )
 } )
 
 public abstract class AsyncEvent implements Serializable {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
new file mode 100644
index 0000000..207b15e
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+
+import java.util.UUID;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * An index event for publishing to elastic search
+ */
+public final class ElasticsearchIndexEvent extends AsyncEvent {
+
+
+    @JsonProperty
+    protected UUID indexBatchId;
+
+    public ElasticsearchIndexEvent() {
+    }
+
+    public ElasticsearchIndexEvent(  UUID indexBatchId ) {
+        this.indexBatchId = indexBatchId;
+    }
+
+
+    /**
+     * Get the unique message id of the
+     * @return
+     */
+    public UUID getIndexBatchId() {
+        return indexBatchId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7d022e5..6fd73b4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -103,4 +103,12 @@ public interface IndexProcessorFig extends GuicyFig {
     @Default("false")
     @Key("elasticsearch.queue_impl.resolution")
     boolean resolveSynchronously();
+
+    /**
+     * Get the message TTL in milliseconds
+     * @return
+     */
+    @Default("604800000")
+    @Key( "elasticsearch.message.ttl" )
+    int getIndexMessageTtl();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
new file mode 100644
index 0000000..dbd5ca3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.corepersistence.util;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+
+/**
+ * An utility class to serialize and de-serialized objects as json strings
+ */
+public final class ObjectJsonSerializer {
+
+
+    private final JsonFactory JSON_FACTORY = new JsonFactory();
+
+    private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+    public ObjectJsonSerializer( ) {
+        MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
+    }
+
+
+    public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+
+        Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
+        final String stringValue;
+        //mark this version as empty
+
+        //Convert to internal entity map
+        try {
+            stringValue = MAPPER.writeValueAsString( toSerialize );
+        }
+        catch ( JsonProcessingException jpe ) {
+            throw new RuntimeException( "Unable to serialize entity", jpe );
+        }
+
+        return stringValue;
+    }
+
+
+    public <T extends Serializable> T fromString( final String value, final Class<T> toSerialize ) {
+
+        Preconditions.checkNotNull( value, "value must not be null" );
+
+        try {
+            return MAPPER.readValue( value, toSerialize );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to deserialize", e );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index a14437c..e83d6f8 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -79,13 +80,16 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     @Inject
     public IndexLocationStrategyFactory indexLocationStrategyFactory;
 
+    @Inject
+    public MapManagerFactory mapManagerFactory;
+
 
     @Inject
     public EntityIndexFactory entityIndexFactory;
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
index d34a1a9..2863cbf 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java
@@ -189,7 +189,7 @@ public abstract class AsyncIndexServiceTest {
             }
 
             try {
-                Thread.sleep( 100 );
+                Thread.sleep( 10000 );
             }
             catch ( InterruptedException e ) {
                 //swallow

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index c280fee..80e2d17 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -33,7 +33,14 @@ public interface MapManager {
     /**
      * Return the string, null if not found
      */
-    public String getString( final String key );
+    String getString( final String key );
+
+    /**
+     * Read the string at a high consistency level.  This should ensure data replication has happened
+     * @param key
+     * @return
+     */
+    String getStringHighConsistency(final String key);
 
 
     /**
@@ -41,12 +48,12 @@ public interface MapManager {
      * @param keys
      * @return
      */
-    public Map<String, String> getStrings(final Collection<String> keys);
+    Map<String, String> getStrings( final Collection<String> keys );
 
     /**
      * Return the string, null if not found
      */
-    public void putString( final String key, final String value );
+    void putString( final String key, final String value );
 
     /**
      * The time to live (in seconds) of the string
@@ -54,33 +61,33 @@ public interface MapManager {
      * @param value
      * @param ttl
      */
-    public void putString( final String key, final String value, final int ttl );
+    void putString( final String key, final String value, final int ttl );
 
 
     /**
      * Return the uuid, null if not found
      */
-    public UUID getUuid( final String key );
+    UUID getUuid( final String key );
 
     /**
      * Return the uuid, null if not found
      */
-    public void putUuid( final String key, final UUID putUuid );
+    void putUuid( final String key, final UUID putUuid );
 
     /**
      * Return the long, null if not found
      */
-    public Long getLong( final String key );
+    Long getLong( final String key );
 
     /**
      * Return the long, null if not found
      */
-    public void putLong( final String key, final Long value );
+    void putLong( final String key, final Long value );
 
     /**
      * Delete the key
      *
      * @param key The key used to delete the entry
      */
-    public void delete( final String key );
+    void delete( final String key );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index fb2e7ff..501ade7 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -53,6 +53,12 @@ public class MapManagerImpl implements MapManager {
 
 
     @Override
+    public String getStringHighConsistency( final String key ) {
+        return mapSerialization.getStringHighConsistency(scope, key);
+    }
+
+
+    @Override
     public Map<String, String> getStrings( final Collection<String> keys ) {
         return mapSerialization.getStrings( scope, keys );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 2e958c2..e9c21d2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -32,50 +32,59 @@ public interface MapSerialization extends Migration {
     /**
      * Return the string, null if not found
      */
-    public String getString( final MapScope scope, final String key );
+    String getString( final MapScope scope, final String key );
+
+
+    /**
+     * Get the key from all regions with a high consistency
+     * @param scope
+     * @param key
+     * @return
+     */
+    String getStringHighConsistency( final MapScope scope, final String key );
 
     /**
      * Get strings from the map
      * @param keys
      * @return
      */
-    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+    Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
 
     /**
      * Return the string, null if not found
      */
-    public void putString( final MapScope scope, final String key, final String value );
+    void putString( final MapScope scope, final String key, final String value );
 
     /**
      * Write the string
      */
-    public void putString( final MapScope scope, final String key, final String value, final int ttl );
+    void putString( final MapScope scope, final String key, final String value, final int ttl );
 
 
     /**
      * Return the uuid, null if not found
      */
-    public UUID getUuid( final MapScope scope, final String key );
+    UUID getUuid( final MapScope scope, final String key );
 
     /**
      * Return the uuid, null if not found
      */
-    public void putUuid( final MapScope scope, final String key, final UUID putUuid );
+    void putUuid( final MapScope scope, final String key, final UUID putUuid );
 
     /**
      * Return the long, null if not found
      */
-    public Long getLong( final MapScope scope, final String key );
+    Long getLong( final MapScope scope, final String key );
 
     /**
      * Return the long, null if not found
      */
-    public void putLong( final MapScope scope, final String key, final Long value );
+    void putLong( final MapScope scope, final String key, final Long value );
 
     /**
      * Delete the key
      *
      * @param key The key used to delete the entry
      */
-    public void delete( final MapScope scope, final String key );
+    void delete( final MapScope scope, final String key );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 825d636..1aa3229 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,6 +18,8 @@
  */
 
 package org.apache.usergrid.persistence.map.impl;
+
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -26,21 +28,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.google.common.base.Preconditions;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.shard.ExpandingShardLocator;
 import org.apache.usergrid.persistence.core.shard.StringHashUtils;
 import org.apache.usergrid.persistence.map.MapScope;
 
+import com.google.common.base.Preconditions;
 import com.google.common.hash.Funnel;
 import com.google.common.hash.PrimitiveSink;
 import com.google.inject.Inject;
@@ -53,9 +55,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.ConsistencyLevel;
 import com.netflix.astyanax.model.Row;
 import com.netflix.astyanax.model.Rows;
-import com.netflix.astyanax.query.ColumnFamilyQuery;
 import com.netflix.astyanax.serializers.BooleanSerializer;
 import com.netflix.astyanax.serializers.StringSerializer;
 
@@ -65,41 +67,40 @@ public class MapSerializationImpl implements MapSerialization {
 
     private static final MapKeySerializer KEY_SERIALIZER = new MapKeySerializer();
 
-        private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
-                new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
+    private static final BucketScopedRowKeySerializer<String> MAP_KEY_SERIALIZER =
+        new BucketScopedRowKeySerializer<>( KEY_SERIALIZER );
 
 
-        private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
-        private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
-                new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
+    private static final MapEntrySerializer ENTRY_SERIALIZER = new MapEntrySerializer();
+    private static final ScopedRowKeySerializer<MapEntryKey> MAP_ENTRY_SERIALIZER =
+        new ScopedRowKeySerializer<>( ENTRY_SERIALIZER );
 
 
-        private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
+    private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
 
-        private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
+    private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
 
 
     private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
 
 
-        /**
-         * CFs where the row key contains the source node id
-         */
-        public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
-            MAP_ENTRIES = new MultiTennantColumnFamily<>(
-                "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
+    /**
+     * CFs where the row key contains the source node id
+     */
+    public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean> MAP_ENTRIES =
+        new MultiTennantColumnFamily<>( "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
 
 
-        /**
-         * CFs where the row key contains the source node id
-         */
-        public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
-                new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
+    /**
+     * CFs where the row key contains the source node id
+     */
+    public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
+        new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
 
     /**
      * Number of buckets to hash across.
      */
-    private static final int[] NUM_BUCKETS = {20};
+    private static final int[] NUM_BUCKETS = { 20 };
 
     /**
      * How to funnel keys for buckets
@@ -107,7 +108,6 @@ public class MapSerializationImpl implements MapSerialization {
     private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
 
 
-
         @Override
         public void funnel( final String key, final PrimitiveSink into ) {
             into.putString( key, StringHashUtils.UTF8 );
@@ -117,8 +117,8 @@ public class MapSerializationImpl implements MapSerialization {
     /**
      * Locator to get us all buckets
      */
-    private static final ExpandingShardLocator<String>
-            BUCKET_LOCATOR = new ExpandingShardLocator<>(MAP_KEY_FUNNEL, NUM_BUCKETS);
+    private static final ExpandingShardLocator<String> BUCKET_LOCATOR =
+        new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
 
     private final Keyspace keyspace;
 
@@ -129,13 +129,20 @@ public class MapSerializationImpl implements MapSerialization {
 
     @Override
     public String getString( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue(scope, key); // TODO: why boolean?
-        return (col !=null) ?  col.getStringValue(): null;
+        Column<Boolean> col = getValue( scope, key );
+        return ( col != null ) ? col.getStringValue() : null;
     }
 
 
     @Override
-    public Map<String, String> getStrings(final MapScope scope,  final Collection<String> keys ) {
+    public String getStringHighConsistency( final MapScope scope, final String key ) {
+        Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+        return ( col != null ) ? col.getStringValue() : null;
+    }
+
+
+    @Override
+    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys ) {
         return getValues( scope, keys, STRING_RESULTS_BUILDER );
     }
 
@@ -144,13 +151,13 @@ public class MapSerializationImpl implements MapSerialization {
     public void putString( final MapScope scope, final String key, final String value ) {
         final RowOp op = new RowOp() {
             @Override
-            public void putValue(final ColumnListMutation<Boolean> columnListMutation ) {
+            public void putValue( final ColumnListMutation<Boolean> columnListMutation ) {
                 columnListMutation.putColumn( true, value );
             }
 
 
             @Override
-            public void putKey(final ColumnListMutation<String> keysMutation ) {
+            public void putKey( final ColumnListMutation<String> keysMutation ) {
                 keysMutation.putColumn( key, true );
             }
         };
@@ -184,10 +191,6 @@ public class MapSerializationImpl implements MapSerialization {
 
     /**
      * Write our string index with the specified row op
-     * @param scope
-     * @param key
-     * @param value
-     * @param rowOp
      */
     private void writeString( final MapScope scope, final String key, final String value, final RowOp rowOp ) {
 
@@ -225,10 +228,11 @@ public class MapSerializationImpl implements MapSerialization {
     /**
      * Callbacks for performing row operations
      */
-    private static interface RowOp{
+    private static interface RowOp {
 
         /**
          * Callback to do the row
+         *
          * @param columnListMutation The column mutation
          */
         void putValue( final ColumnListMutation<Boolean> columnListMutation );
@@ -236,104 +240,97 @@ public class MapSerializationImpl implements MapSerialization {
 
         /**
          * Write the key
-         * @param keysMutation
          */
         void putKey( final ColumnListMutation<String> keysMutation );
-
-
     }
 
+
     @Override
     public UUID getUuid( final MapScope scope, final String key ) {
 
-        Column<Boolean> col = getValue(scope, key);
-        return (col !=null) ?  col.getUUIDValue(): null;
+        Column<Boolean> col = getValue( scope, key );
+        return ( col != null ) ? col.getUUIDValue() : null;
     }
 
 
     @Override
     public void putUuid( final MapScope scope, final String key, final UUID putUuid ) {
 
-        Preconditions.checkNotNull(scope, "mapscope is required");
+        Preconditions.checkNotNull( scope, "mapscope is required" );
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( putUuid, "value is required" );
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
         //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //serialize to the entry
-        batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, putUuid);
+        batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, putUuid );
 
         //add it to the keys
 
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
 
-        final BucketScopedRowKey< String> keyRowKey =
-                BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
 
         //serialize to the entry
-        batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
-
-        executeBatch(batch);
+        batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
 
+        executeBatch( batch );
     }
 
 
     @Override
     public Long getLong( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue(scope, key);
-        return (col !=null) ?  col.getLongValue(): null;
+        Column<Boolean> col = getValue( scope, key );
+        return ( col != null ) ? col.getLongValue() : null;
     }
 
 
-
-
     @Override
     public void putLong( final MapScope scope, final String key, final Long value ) {
 
-        Preconditions.checkNotNull(scope, "mapscope is required");
+        Preconditions.checkNotNull( scope, "mapscope is required" );
         Preconditions.checkNotNull( key, "key is required" );
         Preconditions.checkNotNull( value, "value is required" );
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
         //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //serialize to the entry
-        batch.withRow(MAP_ENTRIES, entryRowKey).putColumn(true, value);
+        batch.withRow( MAP_ENTRIES, entryRowKey ).putColumn( true, value );
 
         //add it to the keys
         final int bucket = BUCKET_LOCATOR.getCurrentBucket( key );
 
-               final BucketScopedRowKey< String> keyRowKey =
-                       BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket);
+        final BucketScopedRowKey<String> keyRowKey = BucketScopedRowKey.fromKey( scope.getApplication(), key, bucket );
 
         //serialize to the entry
-        batch.withRow(MAP_KEYS, keyRowKey).putColumn(key, true);
+        batch.withRow( MAP_KEYS, keyRowKey ).putColumn( key, true );
 
-        executeBatch(batch);
+        executeBatch( batch );
     }
 
 
     @Override
     public void delete( final MapScope scope, final String key ) {
         final MutationBatch batch = keyspace.prepareMutationBatch();
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //serialize to the entry
-        batch.withRow(MAP_ENTRIES, entryRowKey).delete();
+        batch.withRow( MAP_ENTRIES, entryRowKey ).delete();
 
         //add it to the keys, we're not sure which one it may have come from
-       final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
+        final int[] buckets = BUCKET_LOCATOR.getAllBuckets( key );
 
 
-        final List<BucketScopedRowKey<String>>
-                rowKeys = BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
+        final List<BucketScopedRowKey<String>> rowKeys =
+            BucketScopedRowKey.fromRange( scope.getApplication(), key, buckets );
 
-        for(BucketScopedRowKey<String> rowKey: rowKeys) {
+        for ( BucketScopedRowKey<String> rowKey : rowKeys ) {
             batch.withRow( MAP_KEYS, rowKey ).deleteColumn( key );
         }
 
@@ -345,34 +342,53 @@ public class MapSerializationImpl implements MapSerialization {
     public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
 
         final MultiTennantColumnFamilyDefinition mapEntries =
-                new MultiTennantColumnFamilyDefinition( MAP_ENTRIES,
-                       BytesType.class.getSimpleName(),
-                       BytesType.class.getSimpleName(),
-                       BytesType.class.getSimpleName(),
-                       MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+            new MultiTennantColumnFamilyDefinition( MAP_ENTRIES, BytesType.class.getSimpleName(),
+                BytesType.class.getSimpleName(), BytesType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
 
         final MultiTennantColumnFamilyDefinition mapKeys =
-                new MultiTennantColumnFamilyDefinition( MAP_KEYS,
-                        BytesType.class.getSimpleName(),
-                        UTF8Type.class.getSimpleName(),
-                        BytesType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+            new MultiTennantColumnFamilyDefinition( MAP_KEYS, BytesType.class.getSimpleName(),
+                UTF8Type.class.getSimpleName(), BytesType.class.getSimpleName(),
+                MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
 
         return Arrays.asList( mapEntries, mapKeys );
     }
 
 
-    private  Column<Boolean> getValue(MapScope scope, String key) {
+    private Column<Boolean> getValue( MapScope scope, String key ) {
+
+
+        //add it to the entry
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
+
+        //now get all columns, including the "old row key value"
+        try {
+            final Column<Boolean> result =
+                keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
+
+            return result;
+        }
+        catch ( NotFoundException nfe ) {
+            //nothing to return
+            return null;
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+    }
+
 
+    private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
 
 
         //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
         //now get all columns, including the "old row key value"
         try {
-            final Column<Boolean> result = keyspace.prepareQuery( MAP_ENTRIES )
-                    .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+            final Column<Boolean> result =
+                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM )
+                        .getKey( entryRowKey ).getColumn( true ).execute().getResult();
 
             return result;
         }
@@ -388,52 +404,45 @@ public class MapSerializationImpl implements MapSerialization {
 
     /**
      * Get multiple values, using the string builder
-     * @param scope
-     * @param keys
-     * @param builder
-     * @param <T>
-     * @return
      */
-    private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+    private <T> T getValues( final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder ) {
 
 
         final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
 
-        for(final String key: keys){
-             //add it to the entry
-            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+        for ( final String key : keys ) {
+            //add it to the entry
+            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
 
             rowKeys.add( entryRowKey );
-
         }
 
 
+        //now get all columns, including the "old row key value"
+        try {
+            final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
+                keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute()
+                        .getResult();
 
-          //now get all columns, including the "old row key value"
-          try {
-              final Rows<ScopedRowKey<MapEntryKey>, Boolean>
-                  rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
-                                                     .execute().getResult();
-
-
-             return builder.buildResults( rows );
-          }
-          catch ( NotFoundException nfe ) {
-              //nothing to return
-              return null;
-          }
-          catch ( ConnectionException e ) {
-              throw new RuntimeException( "Unable to connect to cassandra", e );
-          }
-      }
 
+            return builder.buildResults( rows );
+        }
+        catch ( NotFoundException nfe ) {
+            //nothing to return
+            return null;
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
+        }
+    }
 
 
-    private void executeBatch(MutationBatch batch) {
+    private void executeBatch( MutationBatch batch ) {
         try {
             batch.execute();
-        } catch (ConnectionException e) {
-            throw new RuntimeException("Unable to connect to cassandra", e);
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to cassandra", e );
         }
     }
 
@@ -501,8 +510,7 @@ public class MapSerializationImpl implements MapSerialization {
         /**
          * Create a scoped row key from the key
          */
-        public static ScopedRowKey<MapEntryKey> fromKey(
-                final MapScope mapScope, final String key ) {
+        public static ScopedRowKey<MapEntryKey> fromKey( final MapScope mapScope, final String key ) {
 
             return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
         }
@@ -511,32 +519,32 @@ public class MapSerializationImpl implements MapSerialization {
 
     /**
      * Build the results from the row keys
-     * @param <T>
      */
     private static interface ResultsBuilder<T> {
 
-        public T buildResults(final  Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+        public T buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows );
     }
 
-    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>> {
 
         @Override
         public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
             final int size = rows.size();
 
-            final Map<String, String> results = new HashMap<>(size);
+            final Map<String, String> results = new HashMap<>( size );
 
-            for(int i = 0; i < size; i ++){
+            for ( int i = 0; i < size; i++ ) {
 
                 final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
 
                 final String value = row.getColumns().getStringValue( true, null );
 
-                if(value == null){
+                if ( value == null ) {
                     continue;
                 }
 
-               results.put( row.getKey().getKey().key,  value );
+                results.put( row.getKey().getKey().key, value );
             }
 
             return results;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
index 4f47749..4060dac 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexOperation.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.SearchEdge;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
@@ -42,7 +43,10 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createInd
 @JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
 public class DeIndexOperation implements BatchOperation {
 
+    @JsonProperty
     private String[] indexes;
+
+    @JsonProperty
     private String documentId;
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
index fae809f..28f2e0d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperation.java
@@ -30,6 +30,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 
 /**
@@ -37,9 +38,12 @@ import org.elasticsearch.client.Client;
  */
 public class IndexOperation implements BatchOperation {
 
+    @JsonProperty
     public String writeAlias;
+    @JsonProperty
     public String documentId;
 
+    @JsonProperty
     public Map<String, Object> data;
 
     public IndexOperation( final String writeAlias, final ApplicationScope applicationScope, IndexEdge indexEdge,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 12df390..bcee308 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 
 
@@ -33,9 +34,13 @@ import com.google.common.base.Optional;
  * Container for index operations.
  */
 public class IndexOperationMessage implements Serializable {
+    @JsonProperty
     private final Set<IndexOperation> indexRequests;
+
+    @JsonProperty
     private final Set<DeIndexOperation> deIndexRequests;
 
+    @JsonProperty
     private long creationTime;
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index d974529..5201279 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.queue;
 import rx.Observable;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -67,12 +68,21 @@ public class DefaultQueueManager implements QueueManager {
         }
     }
 
+
     @Override
-    public synchronized void sendMessage(Object body) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
         String uuid = UUID.randomUUID().toString();
         queue.add(new QueueMessage(uuid,"handle_"+uuid,body,"put type here"));
+
     }
 
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+       sendMessage( body );
+    }
+
+
     @Override
     public void deleteQueue() {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 027abb2..dc3d1b5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,13 @@ public interface QueueManager {
      * @param body
      * @throws IOException
      */
-    void sendMessage(Object body)throws IOException;
+    <T extends Serializable> void  sendMessage(T body)throws IOException;
+
+    /**
+     * Send a messae to the topic to be sent to other queues
+     * @param body
+     */
+    <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
 
     /**
      * purge messages

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index d476f76..59ecd24 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -18,15 +18,55 @@
 package org.apache.usergrid.persistence.queue.impl;
 
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNSAsyncClient;
 import com.amazonaws.services.sns.AmazonSNSClient;
-import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sns.model.PublishRequest;
+import com.amazonaws.services.sns.model.PublishResult;
+import com.amazonaws.services.sns.model.SubscribeRequest;
+import com.amazonaws.services.sns.model.SubscribeResult;
+import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.*;
+import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
+import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
+import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.DeleteQueueRequest;
+import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
+import com.amazonaws.services.sqs.model.GetQueueUrlResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
+import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -36,20 +76,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 
 public class SNSQueueManagerImpl implements QueueManager {
 
@@ -59,10 +85,10 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final QueueFig fig;
     private final ClusterFig clusterFig;
     private final CassandraFig cassandraFig;
-    private final QueueFig queueFig;
     private final AmazonSQSClient sqs;
     private final AmazonSNSClient sns;
     private final AmazonSNSAsyncClient snsAsync;
+    private final AmazonSQSAsyncClient sqsAsync;
 
 
     private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -110,6 +136,7 @@ public class SNSQueueManagerImpl implements QueueManager {
         });
 
 
+
     @Inject
     public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
                                CassandraFig cassandraFig, QueueFig queueFig) {
@@ -117,12 +144,21 @@ public class SNSQueueManagerImpl implements QueueManager {
         this.fig = fig;
         this.clusterFig = clusterFig;
         this.cassandraFig = cassandraFig;
-        this.queueFig = queueFig;
+
+
+        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+        final ExecutorService executor = TaskExecutorFactory
+            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+                TaskExecutorFactory.RejectionAction.CALLERRUNS);
+
+
+        final Region region = getRegion();
 
         try {
-            sqs = createSQSClient(getRegion());
-            sns = createSNSClient(getRegion());
-            snsAsync = createAsyncSNSClient(getRegion());
+            sqs = createSQSClient(region);
+            sns = createSNSClient(region);
+            snsAsync = createAsyncSNSClient(region, executor);
+            sqsAsync = createAsyncSQSClient( region, executor );
 
         } catch (Exception e) {
             throw new RuntimeException("Error setting up mapper", e);
@@ -157,7 +193,7 @@ public class SNSQueueManagerImpl implements QueueManager {
         try {
 
             SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
-            sns.subscribe(primarySubscribeRequest);
+             sns.subscribe(primarySubscribeRequest);
 
             // ensure the SNS primary topic has permission to send to the primary SQS queue
             List<String> primaryTopicArnList = new ArrayList<>();
@@ -276,22 +312,35 @@ public class SNSQueueManagerImpl implements QueueManager {
      *
      */
 
-    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
+    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
 
-        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
-        final Executor executor = TaskExecutorFactory
-            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
-                TaskExecutorFactory.RejectionAction.CALLERRUNS);
-
-        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), (ExecutorService) executor);
+        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
 
         sns.setRegion(region);
 
         return sns;
     }
 
+
+    /**
+     * Create the async sqs client
+     * @param region
+     * @param executor
+     * @return
+     */
+    private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+        final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
+
+        sqs.setRegion( region );
+
+        return sqs;
+
+    }
+
     /**
      * The Synchronous SNS client is used for creating topics and subscribing queues.
      *
@@ -369,7 +418,12 @@ public class SNSQueueManagerImpl implements QueueManager {
                 try {
                     final JsonNode bodyNode =  mapper.readTree(message.getBody());
                     JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
-                    body = fromString(bodyObj.textValue(), klass);
+
+
+
+                    final String bodyText = mapper.writeValueAsString( bodyObj );;
+
+                    body = fromString(bodyText, klass);
                 } catch (Exception e) {
                     logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
                     throw new RuntimeException(e);
@@ -405,6 +459,40 @@ public class SNSQueueManagerImpl implements QueueManager {
         }
     }
 
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+        if (snsAsync == null) {
+                   logger.error("SNS client is null, perhaps it failed to initialize successfully");
+                   return;
+               }
+
+               final String stringBody = toString(body);
+
+               String topicArn = getWriteTopicArn();
+
+               if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+               PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+
+               snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+                   @Override
+                   public void onError( Exception e ) {
+                       logger.error( "Error publishing message... {}", e );
+                   }
+
+
+                   @Override
+                   public void onSuccess( PublishRequest request, PublishResult result ) {
+                       if ( logger.isDebugEnabled() ) logger
+                           .debug( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
+                               request.getTopicArn() );
+                   }
+               } );
+
+    }
+
+
     @Override
     public void sendMessages(final List bodies) throws IOException {
 
@@ -414,41 +502,47 @@ public class SNSQueueManagerImpl implements QueueManager {
         }
 
         for (Object body : bodies) {
-            sendMessage(body);
+            sendMessage((Serializable)body);
         }
 
     }
 
+
     @Override
-    public void sendMessage(final Object body) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
 
-        if (snsAsync == null) {
-            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+        if ( snsAsync == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
             return;
         }
 
-        final String stringBody = toString(body);
+        final String stringBody = toString( body );
 
-        String topicArn = getWriteTopicArn();
+        String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Publishing Message...{} to url: {}", stringBody, url );
+        }
 
-        PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+        SendMessageRequest request = new SendMessageRequest( url, stringBody );
 
-        snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
-            @Override
-            public void onError(Exception e) {
-                logger.error("Error publishing message... {}", e);
-            }
+        sqsAsync.sendMessageAsync( request, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
 
             @Override
-            public void onSuccess(PublishRequest request, PublishResult result) {
-                if (logger.isDebugEnabled())
-                    logger.debug("Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(), request.getTopicArn());
+            public void onError( final Exception e ) {
 
+                logger.error( "Error sending message... {}", e );
             }
-        });
 
+
+            @Override
+            public void onSuccess( final SendMessageRequest request, final SendMessageResult sendMessageResult ) {
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug( "Successfully send... messageBody=[{}],  url=[{}]", request.getMessageBody(),
+                        request.getQueueUrl() );
+                }
+            }
+        } );
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index fa9a7ac..0c56c05 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.queue.impl;
 
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
@@ -243,25 +244,34 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     }
 
+
     @Override
-    public void sendMessage(final Object body) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
 
         if (sqs == null) {
-            logger.error("Sqs is null");
-            return;
-        }
+              logger.error("Sqs is null");
+              return;
+          }
 
-        String url = getQueue().getUrl();
+          String url = getQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
+          if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
 
-        final String stringBody = toString(body);
+          final String stringBody = toString(body);
 
-        SendMessageRequest request = new SendMessageRequest(url, stringBody);
-        sqs.sendMessage(request);
+          SendMessageRequest request = new SendMessageRequest(url, stringBody);
+          sqs.sendMessage(request);
+      }
+
+
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
+        sendMessage( body );
     }
 
 
+
     @Override
     public void commitMessage(final QueueMessage queueMessage) {
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/94a90781/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index bca9a49..bc55ff4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.services.queues;
 
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -65,7 +66,13 @@ public class ImportQueueManager implements QueueManager {
 
 
     @Override
-    public void sendMessage( final Object body ) throws IOException {
+    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
+
+    }
+
+
+    @Override
+    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
 
     }
 


[04/18] usergrid git commit: Fixes empty payload notification issue.

Posted by to...@apache.org.
Fixes empty payload notification issue.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0326629a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0326629a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0326629a

Branch: refs/heads/USERGRID-1052
Commit: 0326629a24cec3bd44d91810b4b8f0516c69c9b8
Parents: 3e15585
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:53:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:53:30 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 55 ++++++++++++--------
 .../asyncevents/AsyncIndexProvider.java         |  4 +-
 .../index/AmazonAsyncEventServiceTest.java      |  2 +-
 3 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f8ef5e7..6b2eb45 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -89,6 +89,21 @@ import rx.Subscription;
 import rx.schedulers.Schedulers;
 
 
+/**
+ * TODO, this whole class is becoming a nightmare.  We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce.  Keep the code in the handle as is
+ * 2. Consume:  Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation.  Any handler will be refactored into it's own
+ *      impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them.  Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
 @Singleton
 public class AmazonAsyncEventService implements AsyncEventService {
 
@@ -360,7 +375,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope );
-        offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+        offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+            new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
     }
 
 
@@ -503,35 +519,29 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final String message = esMapPersistence.getString( messageId.toString() );
 
-        String highConsistency = null;
+        final IndexOperationMessage indexOperationMessage;
 
         if(message == null){
             logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
 
-            highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
-
-        }
+            final String highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 
-        //read the value from the string
+            if(highConsistency == null){
+                logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
 
-        final IndexOperationMessage indexOperationMessage;
+                throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+            }
 
-        //our original local read has it, parse it.
-        if(message != null){
-             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
-        }
-        //we tried to read it at a higher consistency level and it works
-        else if (highConsistency != null){
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
-        }
 
-        //we couldn't find it, bail
-        else{
-            logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
-
-            throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+        } else{
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
 
+        //read the value from the string
+
+        Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+        Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
 
 
         //now execute it
@@ -728,9 +738,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
             .map(result -> result.getQueueMessage().get())
             .collect(Collectors.toList());
 
-        //send the batch
-        //TODO: should retry?
-        queueIndexOperationMessage( combined );
+        //only Q it if it's empty
+        if(!combined.isEmpty()) {
+            queueIndexOperationMessage( combined );
+        }
 
         return messagesToAck;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 1649046..2bace8d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -105,10 +105,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
             case LOCAL:
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
             case SQS:
-                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
+                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
             case SNS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0326629a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 8ee47a2..625a8fd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -93,7 +93,7 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
 
     @Override
     protected AsyncEventService getAsyncEventService() {
-        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
+        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig,  rxTaskScheduler );
     }
 
 


[12/18] usergrid git commit: Fixes incorrect units on timeout from millis to seconds.

Posted by to...@apache.org.
Fixes incorrect units on timeout from millis to seconds.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1fe1d1a3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1fe1d1a3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1fe1d1a3

Branch: refs/heads/USERGRID-1052
Commit: 1fe1d1a34e806ebfc53d0c6cb729e7e4aab804a1
Parents: e50835f
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 20 11:37:03 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 20 11:44:35 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1fe1d1a3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6f779b5..7034a67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -491,8 +492,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         final UUID newMessageId = UUIDGenerator.newTimeUUID();
 
+        final int expirationTimeInSeconds =
+            ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
         //write to the map in ES
-        esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+        esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
 
 
 


[10/18] usergrid git commit: Add null check back for batch sending of SQS w/proper SQS client. Less logging would occur in the event of batch send when the client is not initialized.

Posted by to...@apache.org.
Add null check back for batch sending of SQS w/proper SQS client.  Less logging would occur in the event of batch send when the client is not initialized.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbb6c823
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbb6c823
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbb6c823

Branch: refs/heads/USERGRID-1052
Commit: fbb6c823b7e2a1f5f55ab044942b38d1157970c0
Parents: d8e6572
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 21:28:46 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 21:28:46 2015 -0700

----------------------------------------------------------------------
 .../usergrid/persistence/queue/impl/SNSQueueManagerImpl.java    | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbb6c823/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 3c18992..1bb00dc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -543,6 +543,11 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public void sendMessages( final List bodies ) throws IOException {
 
+        if ( sqsAsync == null ) {
+            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
+            return;
+        }   
+
         for ( Object body : bodies ) {
             sendMessage( ( Serializable ) body );
         }


[03/18] usergrid git commit: Merge branch 'refs/heads/2.1-release' into USERGRID-1048

Posted by to...@apache.org.
Merge branch 'refs/heads/2.1-release' into USERGRID-1048


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3e155852
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3e155852
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3e155852

Branch: refs/heads/USERGRID-1052
Commit: 3e1558524728c96834cfd66d9e53e3f1b6a7d3d6
Parents: 04a3f47 a09485a
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:30:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:30:04 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 149 ++++++++++---------
 .../asyncevents/AsyncIndexProvider.java         |  26 ++--
 .../asyncevents/model/AsyncEvent.java           |  14 +-
 .../asyncevents/model/EdgeDeleteEvent.java      |   6 +-
 .../asyncevents/model/EdgeIndexEvent.java       |   9 +-
 .../asyncevents/model/EntityDeleteEvent.java    |   8 +-
 .../asyncevents/model/EntityIndexEvent.java     |   6 +-
 .../model/InitializeApplicationIndexEvent.java  |   4 +-
 .../index/AmazonAsyncEventServiceTest.java      |   6 +-
 .../cache/CachedEntityCollectionManager.java    | 147 ------------------
 .../EntityCollectionManagerFactoryImpl.java     |   6 -
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../queue/impl/SNSQueueManagerImpl.java         |   8 +-
 13 files changed, 135 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d319ac8,c198674..f8ef5e7
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -97,7 -84,9 +100,8 @@@ public class AmazonAsyncEventService im
      public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
  
      private final QueueManager queue;
 -    private final QueueScope queueScope;
      private final IndexProcessorFig indexProcessorFig;
+     private final QueueFig queueFig;
      private final IndexProducer indexProducer;
      private final EntityCollectionManagerFactory entityCollectionManagerFactory;
      private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -125,33 -113,28 +129,35 @@@
  
  
      @Inject
 -    public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
 -                                   final IndexProcessorFig indexProcessorFig,
 -                                   final IndexProducer indexProducer,
 -                                   final MetricsFactory metricsFactory,
 -                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
 -                                   final IndexLocationStrategyFactory indexLocationStrategyFactory,
 -                                   final EntityIndexFactory entityIndexFactory,
 -                                   final EventBuilder eventBuilder,
 -                                   final RxTaskScheduler rxTaskScheduler,
 -                                   QueueFig queueFig) {
 +    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
 +                                    final IndexProcessorFig indexProcessorFig,
 +                                    final IndexProducer indexProducer,
 +                                    final MetricsFactory metricsFactory,
 +                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
 +                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
 +                                    final EntityIndexFactory entityIndexFactory,
 +                                    final EventBuilder eventBuilder,
 +                                    final MapManagerFactory mapManagerFactory,
++                                    final QueueFig queueFig,
 +                                    final RxTaskScheduler rxTaskScheduler ) {
          this.indexProducer = indexProducer;
  
          this.entityCollectionManagerFactory = entityCollectionManagerFactory;
          this.indexLocationStrategyFactory = indexLocationStrategyFactory;
          this.entityIndexFactory = entityIndexFactory;
          this.eventBuilder = eventBuilder;
 +
 +        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
 +
 +        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
 +
          this.rxTaskScheduler = rxTaskScheduler;
  
 -        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
 +        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
          this.queue = queueManagerFactory.getQueueManager(queueScope);
 +
          this.indexProcessorFig = indexProcessorFig;
+         this.queueFig = queueFig;
  
          this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
          this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
@@@ -271,70 -259,64 +277,78 @@@
              logger.debug("callEventHandlers with {} message", messages.size());
          }
  
--        Stream<IndexEventResult> indexEventResults = messages.stream()
--            .map(message -> {
--                AsyncEvent event = null;
--                try {
--                    event = (AsyncEvent) message.getBody();
--                } catch (ClassCastException cce) {
--                    logger.error("Failed to deserialize message body", cce);
++        Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> {
++            AsyncEvent event = null;
++            try {
++                event = ( AsyncEvent ) message.getBody();
++            }
++            catch ( ClassCastException cce ) {
++                logger.error( "Failed to deserialize message body", cce );
++            }
++
++            if ( event == null ) {
++                logger.error( "AsyncEvent type or event is null!" );
++                return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(),
++                    System.currentTimeMillis() );
++            }
++
++            final AsyncEvent thisEvent = event;
++            if ( logger.isDebugEnabled() ) {
++                logger.debug( "Processing {} event", event );
++            }
++
++            try {
++                //check for empty sets if this is true
++                boolean validateEmptySets = true;
++                Observable<IndexOperationMessage> indexoperationObservable;
++                //merge each operation to a master observable;
++                if ( event instanceof EdgeDeleteEvent ) {
++                    indexoperationObservable = handleEdgeDelete( message );
++                }
++                else if ( event instanceof EdgeIndexEvent ) {
++                    indexoperationObservable = handleEdgeIndex( message );
++                }
++                else if ( event instanceof EntityDeleteEvent ) {
++                    indexoperationObservable = handleEntityDelete( message );
++                }
++                else if ( event instanceof EntityIndexEvent ) {
++                    indexoperationObservable = handleEntityIndexUpdate( message );
++                }
++                else if ( event instanceof InitializeApplicationIndexEvent ) {
++                    //does not return observable
++                    handleInitializeApplicationIndex( event, message );
++                    indexoperationObservable = Observable.just( new IndexOperationMessage() );
++                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
++                }
++                else if ( event instanceof ElasticsearchIndexEvent ) {
++                    handleIndexOperation( ( ElasticsearchIndexEvent ) event );
++                    indexoperationObservable = Observable.just( new IndexOperationMessage() );
++                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
                  }
  
--                if (event == null) {
--                    logger.error("AsyncEvent type or event is null!");
--                    return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
++                else {
++                    throw new Exception( "Unknown EventType" );//TODO: print json instead
                  }
  
--                final AsyncEvent thisEvent = event;
--                if (logger.isDebugEnabled()) {
--                    logger.debug("Processing {} event", event);
++                //collect all of the
++                IndexOperationMessage indexOperationMessage = indexoperationObservable
++                    .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) )
++                    .toBlocking().lastOrDefault( null );
++
++                if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) {
++                    logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
++                        message.getStringBody() );
++                    throw new Exception( "Received empty index sequence." );
                  }
  
--                try {
--                    //check for empty sets if this is true
--                    boolean validateEmptySets = true;
--                    Observable<IndexOperationMessage> indexoperationObservable;
--                    //merge each operation to a master observable;
--                    if (event instanceof EdgeDeleteEvent) {
--                        indexoperationObservable = handleEdgeDelete(message);
--                    } else if (event instanceof EdgeIndexEvent) {
--                        indexoperationObservable = handleEdgeIndex(message);
--                    } else if (event instanceof EntityDeleteEvent) {
--                        indexoperationObservable = handleEntityDelete(message);
--                    } else if (event instanceof EntityIndexEvent) {
--                        indexoperationObservable = handleEntityIndexUpdate(message);
--                    } else if (event instanceof InitializeApplicationIndexEvent) {
--                        //does not return observable
--                        handleInitializeApplicationIndex(event, message);
--                        indexoperationObservable = Observable.just(new IndexOperationMessage());
-                         validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                     } else if (event instanceof ElasticsearchIndexEvent){
-                         handleIndexOperation( (ElasticsearchIndexEvent)event );
-                         indexoperationObservable = Observable.just( new IndexOperationMessage() );
--                        validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                     }
- 
-                     else {
 -                    } else {
--                        throw new Exception("Unknown EventType");//TODO: print json instead
--                    }
--
--                    //collect all of the
--                    IndexOperationMessage indexOperationMessage =
--                        indexoperationObservable
--                            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
--                            .toBlocking().lastOrDefault(null);
--
--                    if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
--                        logger.error("Received empty index sequence message:({}), body:({}) ",
--                            message.getMessageId(), message.getStringBody());
--                        throw new Exception("Received empty index sequence.");
--                    }
--
--                    //return type that can be indexed and ack'd later
--                    return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
--                } catch (Exception e) {
--                    logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
--                    return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
++                //return type that can be indexed and ack'd later
++                return new IndexEventResult( Optional.fromNullable( message ),
++                    Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() );
++            }
++            catch ( Exception e ) {
++                logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e );
++                return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(),
++                    event.getCreationTime());
                  }
              });
  
@@@ -346,8 -328,7 +360,7 @@@
      public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
          IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
              applicationScope );
-         offerTopic(
-             new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
 -        offer(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
++        offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
      }
  
  
@@@ -413,8 -394,8 +426,8 @@@
  
          final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
  
--        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
--            applicationScope, entity, edge));
++        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
++            entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge));
          return edgeIndexObservable;
      }
  
@@@ -450,84 -431,9 +463,84 @@@
      @Override
      public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
  
-         offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
+         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
      }
  
 +
 +    /**
 +     * Queue up an indexOperationMessage for multi region execution
 +     * @param indexOperationMessage
 +     */
 +    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
 +
 +        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
 +
 +        final UUID newMessageId = UUIDGenerator.newTimeUUID();
 +
 +        //write to the map in ES
 +        esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
 +
 +
 +
 +        //now queue up the index message
 +
 +        final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
 +
 +        //send to the topic so all regions index the batch
 +
 +        offerTopic( elasticsearchIndexEvent );
 +    }
 +
 +    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
 +         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 +
 +        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
 +
 +        Preconditions.checkNotNull( messageId, "messageId must not be null" );
 +
 +
 +        //load the entity
 +
 +        final String message = esMapPersistence.getString( messageId.toString() );
 +
 +        String highConsistency = null;
 +
 +        if(message == null){
 +            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
 +
 +            highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 +
 +        }
 +
 +        //read the value from the string
 +
 +        final IndexOperationMessage indexOperationMessage;
 +
 +        //our original local read has it, parse it.
 +        if(message != null){
 +             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
 +        }
 +        //we tried to read it at a higher consistency level and it works
 +        else if (highConsistency != null){
 +            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
 +        }
 +
 +        //we couldn't find it, bail
 +        else{
 +            logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
 +
 +            throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
 +        }
 +
 +
 +
 +        //now execute it
 +        indexProducer.put(indexOperationMessage).toBlocking().last();
 +
 +    }
 +
 +
 +
      @Override
      public long getQueueDepth() {
          return queue.getQueueDepth();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 3865ecb,8b44714..1649046
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@@ -28,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
  import org.apache.usergrid.persistence.index.EntityIndexFactory;
  import org.apache.usergrid.persistence.index.impl.IndexProducer;
 +import org.apache.usergrid.persistence.map.MapManagerFactory;
+ import org.apache.usergrid.persistence.queue.QueueFig;
  import org.apache.usergrid.persistence.queue.QueueManagerFactory;
  
  import com.google.inject.Inject;
@@@ -52,18 -51,21 +52,24 @@@ public class AsyncIndexProvider impleme
      private final IndexLocationStrategyFactory indexLocationStrategyFactory;
      private final EntityIndexFactory entityIndexFactory;
      private final IndexProducer indexProducer;
 +    private final MapManagerFactory mapManagerFactory;
+     private final QueueFig queueFig;
  
      private AsyncEventService asyncEventService;
  
  
      @Inject
-     public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
-                                final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
-                                    EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                                final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
-                                final MapManagerFactory mapManagerFactory ) {
+     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
+                               final QueueManagerFactory queueManagerFactory,
+                               final MetricsFactory metricsFactory,
+                               final RxTaskScheduler rxTaskScheduler,
+                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                               final EventBuilder eventBuilder,
+                               final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                               final EntityIndexFactory entityIndexFactory,
 -                              final IndexProducer indexProducer, QueueFig queueFig) {
++                              final IndexProducer indexProducer,
++                              final MapManagerFactory mapManagerFactory,
++                              final QueueFig queueFig) {
  
          this.indexProcessorFig = indexProcessorFig;
          this.queueManagerFactory = queueManagerFactory;
@@@ -74,7 -76,7 +80,8 @@@
          this.indexLocationStrategyFactory = indexLocationStrategyFactory;
          this.entityIndexFactory = entityIndexFactory;
          this.indexProducer = indexProducer;
 +        this.mapManagerFactory = mapManagerFactory;
+         this.queueFig = queueFig;
      }
  
  
@@@ -98,11 -100,11 +105,10 @@@
              case LOCAL:
                  return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
              case SQS:
--                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
 -                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig );
++                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
              case SNS:
                  return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
 -                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig);
++                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
              default:
                  throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
          }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index e83d6f8,5b921d9..8ee47a2
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@@ -89,7 -89,7 +93,7 @@@ public class AmazonAsyncEventServiceTes
  
      @Override
      protected AsyncEventService getAsyncEventService() {
-         return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
 -        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig );
++        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
      }
  
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index a3fa05e,5ab1a4b..58b2a4d
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@@ -226,44 -171,43 +226,44 @@@ public class SNSQueueManagerImpl implem
  
              String multiRegion = fig.getRegionList();
  
 -            if (logger.isDebugEnabled())
 -                logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
 +            if ( logger.isDebugEnabled() ) {
 +                logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
 +            }
  
 -            String[] regionNames = multiRegion.split(",");
 +            String[] regionNames = multiRegion.split( "," );
  
 -            final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
 -            final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
 +            final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
 +            final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
  
-             arrQueueArns.put( primaryQueueArn, fig.getRegion() );
-             topicArns.put( primaryTopicArn, fig.getRegion() );
+             arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
+             topicArns.put(primaryTopicArn, fig.getPrimaryRegion());
  
 -            for (String regionName : regionNames) {
 +            for ( String regionName : regionNames ) {
  
                  regionName = regionName.trim();
 -                Regions regions = Regions.fromName(regionName);
 -                Region region = Region.getRegion(regions);
 +                Regions regions = Regions.fromName( regionName );
 +                Region region = Region.getRegion( regions );
  
 -                AmazonSQSClient sqsClient = createSQSClient(region);
 -                AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
 +                AmazonSQSClient sqsClient = createSQSClient( region );
 +                AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
  
                  // getTopicArn will create the SNS topic if it doesn't exist
 -                String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
 -                topicArns.put(topicArn, regionName);
 +                String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
 +                topicArns.put( topicArn, regionName );
  
                  // create the SQS queue if it doesn't exist
 -                String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
 -                if (queueArn == null) {
 -                    queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
 -                    queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
 +                String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
 +                if ( queueArn == null ) {
 +                    queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
 +                    queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
                  }
  
 -                arrQueueArns.put(queueArn, regionName);
 +                arrQueueArns.put( queueArn, regionName );
              }
  
 -            logger.debug("Creating Subscriptions...");
 +            logger.debug( "Creating Subscriptions..." );
  
 -            for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
 +            for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
                  String queueARN = queueArnEntry.getKey();
                  String strSqsRegion = queueArnEntry.getValue();
  
@@@ -650,10 -519,12 +650,10 @@@
  
      /**
       * Get the region
 -     *
 -     * @return
       */
      private Region getRegion() {
-         Regions regions = Regions.fromName( fig.getRegion() );
-         return Region.getRegion( regions );
+         Regions regions = Regions.fromName(fig.getPrimaryRegion());
+         return Region.getRegion(regions);
      }
  
  


[06/18] usergrid git commit: Adds comment

Posted by to...@apache.org.
Adds comment


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4013f17e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4013f17e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4013f17e

Branch: refs/heads/USERGRID-1052
Commit: 4013f17e434a5eb05f4851d28dba6a3e0bfe8fc8
Parents: 19d30ea
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 14:10:51 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 14:10:51 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/AmazonAsyncEventService.java     | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4013f17e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 67d0dab..ee9054b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -536,6 +536,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
 
+
+        //NOTE that we intentionally do NOT delete from the map.  We can't know when all regions have consumed the message
+        //so we'll let compaction on column expiration handle deletion
+
         //read the value from the string
 
         Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );


[16/18] usergrid git commit: Added more logging statements

Posted by to...@apache.org.
Added more logging statements


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d35fea56
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d35fea56
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d35fea56

Branch: refs/heads/USERGRID-1052
Commit: d35fea5636acfd384f0b35069cdc59e515ec0372
Parents: 8b4faf7
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 10:04:21 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 10:04:21 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/results/ObservableQueryExecutor.java      | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d35fea56/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 7b31d19..7a32dce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -23,6 +23,9 @@ package org.apache.usergrid.corepersistence.results;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
 import org.apache.usergrid.persistence.Results;
 
@@ -39,6 +42,7 @@ import rx.Observable;
 public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
 
 
+    private static final Logger logger = LoggerFactory.getLogger( ObservableQueryExecutor.class );
 
     private Results results;
     private Optional<String> cursor;
@@ -131,9 +135,12 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
             observable = buildNewResultsPage( cursor ).map( resultsPage -> createResultsInternal( resultsPage ) ).defaultIfEmpty(
             new Results() );
 
+        logger.trace( "Trying to load results page" );
         //take the first from our observable
         final Results resultsPage = observable.take(1).toBlocking().first();
 
+        logger.trace( "Results page loaded {}", resultsPage );
+
         //set the results for the iterator
         this.results = resultsPage;
 


[18/18] usergrid git commit: Upgrades RX to latest stable

Posted by to...@apache.org.
Upgrades RX to latest stable

Adds deleted markers for the nodes on edges


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e51d383
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e51d383
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e51d383

Branch: refs/heads/USERGRID-1052
Commit: 4e51d3839cecc9d55ea0bc4f65d5610d29b34fb9
Parents: f73ac4c
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 14:16:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 14:16:17 2015 -0600

----------------------------------------------------------------------
 .../read/traverse/AbstractReadGraphFilter.java  |  2 +-
 .../usergrid/persistence/graph/MarkedEdge.java  | 13 +++++
 .../graph/impl/GraphManagerImpl.java            | 54 ++++++++++----------
 .../graph/impl/SimpleMarkedEdge.java            | 33 ++++++++++--
 stack/corepersistence/pom.xml                   |  2 +-
 5 files changed, 71 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index 88c912a..d3e0345 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
+            return graphManager.loadEdgesFromSource( search )
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
index 4b5eeaa..da6fedb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/MarkedEdge.java
@@ -38,4 +38,17 @@ public interface MarkedEdge extends Edge{
      */
     boolean isDeleted();
 
+    /**
+     * Return true if the source node is deleted
+     * @return
+     */
+    boolean isSourceNodeDelete();
+
+    /**
+     * Return true if the target node is deleted
+     * @return
+     */
+    boolean isTargetNodeDeleted();
+
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index c1e9cea..1bcb398 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -291,7 +291,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( searchByEdge.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesVersionsTimer );
     }
@@ -306,7 +306,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceTimer );
     }
@@ -321,7 +321,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
 
         return ObservableTimer.time( edges, loadEdgesToTargetTimer );
@@ -337,7 +337,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter( search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesFromSourceByTypeTimer );
     }
@@ -352,7 +352,7 @@ public class GraphManagerImpl implements GraphManager {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
                 }
             } ).buffer( graphFig.getScanPageSize() )
-                      .compose( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) );
+                      .compose( new EdgeBufferFilter(  search.filterMarked() ) );
 
         return ObservableTimer.time( edges, loadEdgesToTargetByTypeTimer );
     }
@@ -420,12 +420,10 @@ public class GraphManagerImpl implements GraphManager {
         Observable.Transformer<List<MarkedEdge>, MarkedEdge> {//implements Func1<List<MarkedEdge>,
         // Observable<MarkedEdge>> {
 
-        private final long maxVersion;
         private final boolean filterMarked;
 
 
-        private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) {
-            this.maxVersion = maxVersion;
+        private EdgeBufferFilter( final boolean filterMarked ) {
             this.filterMarked = filterMarked;
         }
 
@@ -444,23 +442,16 @@ public class GraphManagerImpl implements GraphManager {
 
                 final Observable<MarkedEdge> markedEdgeObservable = Observable.from( markedEdges );
 
-                /**
-                 * We aren't going to filter anything, return exactly what we're passed
-                 */
-                if(!filterMarked){
-                    return markedEdgeObservable;
-                }
-
                 //We need to filter, perform that filter
                 final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
 
-                return markedEdgeObservable.filter( edge -> {
-                    final long edgeTimestamp = edge.getTimestamp();
+                return markedEdgeObservable.map( edge -> {
 
-                    //our edge needs to not be deleted and have a version that's > max Version
-                    if ( edge.isDeleted() ) {
-                        return false;
-                    }
+                    /**
+                     * Make sure we mark source and target deleted nodes as such
+                     */
+
+                    final long edgeTimestamp = edge.getTimestamp();
 
 
                     final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
@@ -468,22 +459,29 @@ public class GraphManagerImpl implements GraphManager {
                     //the source Id has been marked for deletion.  It's version is <= to the marked version for
                     // deletion,
                     // so we need to discard it
-                    if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
-                        return false;
-                    }
+                    final boolean isSourceDeleted =  ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 );
 
                     final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
 
                     //the target Id has been marked for deletion.  It's version is <= to the marked version for
                     // deletion,
                     // so we need to discard it
-                    if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
-                        return false;
+                    final  boolean isTargetDeleted = ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 );
+
+                    //one has been marked for deletion, return it
+                    if(isSourceDeleted || isTargetDeleted){
+                        return new SimpleMarkedEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), edge.getTimestamp(), edge.isDeleted(), isSourceDeleted, isTargetDeleted );
                     }
 
+                    return edge;
+                } ).filter( simpleMarkedEdge -> {
+                    if(!filterMarked){
+                        return true;
+                    }
 
-                    return true;
-                } );
+                    //if any one of these is true, we filter it
+                    return !(simpleMarkedEdge.isDeleted() || simpleMarkedEdge.isSourceNodeDelete() || simpleMarkedEdge.isTargetNodeDeleted());
+                });
             } );
         }
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 29d90eb..c6dc2e4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -34,12 +34,23 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
     private final boolean deleted;
+    private final boolean isSourceNodeDeleted;
+    private final boolean isTargetNodeDeleted;
 
 
     public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp, final boolean deleted) {
 
-        super(sourceNode, type, targetNode, timestamp);
+        this( sourceNode, type, targetNode, timestamp, deleted, false, false );
+    }
+
+
+    public SimpleMarkedEdge( final Id sourceNode, final String type, final Id targetNode, final long timestamp,
+                             final boolean deleted, final boolean isSourceNodeDeleted,
+                             final boolean isTargetNodeDeleted ) {
+        super( sourceNode, type, targetNode, timestamp );
         this.deleted = deleted;
+        this.isSourceNodeDeleted = isSourceNodeDeleted;
+        this.isTargetNodeDeleted = isTargetNodeDeleted;
     }
 
 
@@ -56,6 +67,18 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
 
 
     @Override
+    public boolean isSourceNodeDelete() {
+        return isSourceNodeDeleted;
+    }
+
+
+    @Override
+    public boolean isTargetNodeDeleted() {
+        return isTargetNodeDeleted;
+    }
+
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;
@@ -81,6 +104,8 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + ( deleted ? 1 : 0 );
+        result = 31 * result + ( isSourceNodeDeleted ? 1 : 0 );
+        result = 31 * result + ( isTargetNodeDeleted ? 1 : 0 );
         return result;
     }
 
@@ -88,8 +113,10 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
     @Override
     public String toString() {
         return "SimpleMarkedEdge{" +
-                "deleted=" + deleted +
-                "} " + super.toString();
+            "deleted=" + deleted +
+            ", isSourceNodeDeleted=" + isSourceNodeDeleted +
+            ", isTargetNodeDeleted=" + isTargetNodeDeleted +
+            "} " + super.toString();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e51d383/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4b47bc0..4e4648e 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -70,7 +70,7 @@ limitations under the License.
         <junit.version>4.11</junit.version>
         <kryo-serializers.version>0.26</kryo-serializers.version>
         <log4j.version>1.2.17</log4j.version>
-        <rx.version>1.0.12</rx.version>
+        <rx.version>1.0.14</rx.version>
         <slf4j.version>1.7.2</slf4j.version>
         <surefire.version>2.16</surefire.version>
         <aws.version>1.10.6</aws.version>


[17/18] usergrid git commit: Added back pressure block as a test

Posted by to...@apache.org.
Added back pressure block as a test


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f73ac4c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f73ac4c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f73ac4c7

Branch: refs/heads/USERGRID-1052
Commit: f73ac4c724088150bc1e0315942e93306321a72a
Parents: d35fea5
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 10:45:10 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 10:50:27 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/asyncevents/EventBuilderImpl.java        | 3 ++-
 .../usergrid/corepersistence/pipeline/read/FilterResult.java | 7 +++++++
 .../pipeline/read/traverse/AbstractReadGraphFilter.java      | 2 +-
 .../usergrid/persistence/graph/impl/GraphManagerImpl.java    | 8 ++++----
 4 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index cc0356b..18f080b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -155,7 +155,8 @@ public class EventBuilderImpl implements EventBuilder {
         //observable of entries as the batches are deleted
         final Observable<List<MvccLogEntry>> entries =
             ecm.getVersions( entityId ).buffer( serializationFig.getBufferSize() )
-               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ) );
+               .doOnNext( buffer -> ecm.delete( buffer ) ).doOnCompleted( () -> gm.compactNode( entityId ).toBlocking().lastOrDefault(null) );
+
 
 
         return new EntityDeleteResults( edgeObservable, entries );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
index 3c41a2b..915af03 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/FilterResult.java
@@ -53,4 +53,11 @@ public class FilterResult<T> {
     }
 
 
+    @Override
+    public String toString() {
+        return "FilterResult{" +
+            "path=" + path +
+            ", value=" + value +
+            '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index d3e0345..88c912a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -85,7 +85,7 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
             /**
              * TODO, pass a message with pointers to our cursor values to be generated later
              */
-            return graphManager.loadEdgesFromSource( search )
+            return graphManager.loadEdgesFromSource( search ).onBackpressureBlock()
                 //set the edge state for cursors
                 .doOnNext( edge -> {
                     logger.trace( "Seeking over edge {}", edge );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f73ac4c7/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index e119c59..c1e9cea 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -300,7 +300,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSource" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSource( scope, search );
@@ -315,7 +315,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTarget" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTarget( scope, search );
@@ -331,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesFromSourceByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -346,7 +346,7 @@ public class GraphManagerImpl implements GraphManager {
     @Override
     public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
         final Observable<Edge> edges =
-            Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeTypesFromSource" ) {
+            Observable.create( new ObservableIterator<MarkedEdge>( "loadEdgesToTargetByType" ) {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );


[11/18] usergrid git commit: Fixes comments and refactors map to be a cleaner read pattern

Posted by to...@apache.org.
Fixes comments and refactors map to be a cleaner read pattern


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e50835f1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e50835f1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e50835f1

Branch: refs/heads/USERGRID-1052
Commit: e50835f1016a9988513a00b64337222957fa7747
Parents: fbb6c82
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Oct 20 09:34:43 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Oct 20 09:34:43 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  2 +-
 .../core/astyanax/CassandraConfig.java          |  8 ++---
 .../core/astyanax/CassandraConfigImpl.java      |  1 +
 .../map/impl/MapSerializationImpl.java          | 37 ++++----------------
 .../queue/impl/SNSQueueManagerImpl.java         |  4 +--
 5 files changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 38c2966..6f779b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -521,7 +521,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final IndexOperationMessage indexOperationMessage;
 
         if(message == null){
-            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level",
+            logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level",
                 messageId);
 
             final String highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index 817aee2..dba3646 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -34,25 +34,25 @@ public interface CassandraConfig {
      * Get the currently configured ReadCL
      * @return
      */
-    public ConsistencyLevel getReadCL();
+    ConsistencyLevel getReadCL();
 
     /**
      * Get the currently configured ReadCL that is more consitent than getReadCL
      * @return
      */
-    public ConsistencyLevel getConsistentReadCL();
+    ConsistencyLevel getConsistentReadCL();
 
     /**
      * Get the currently configured write CL
      * @return
      */
-    public ConsistencyLevel getWriteCL();
+    ConsistencyLevel getWriteCL();
 
     /**
      * Return the number of shards that has been set in the property file
      * @return
      */
-    public int[] getShardSettings();
+    int[] getShardSettings();
 
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
index 17b91c6..7373322 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfigImpl.java
@@ -83,6 +83,7 @@ public class CassandraConfigImpl implements CassandraConfig {
     public ConsistencyLevel getConsistentReadCL() {
         return consistentCl;
     }
+
     @Override
     public ConsistencyLevel getWriteCL() {
         return writeCl;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index ffe10c9..ceeb3ad 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -129,14 +129,14 @@ public class MapSerializationImpl implements MapSerialization {
 
     @Override
     public String getString( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue( scope, key );
+        Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL()  );
         return ( col != null ) ? col.getStringValue() : null;
     }
 
 
     @Override
     public String getStringHighConsistency( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValueHighConsistency( scope, key ); // TODO: why boolean?
+        Column<Boolean> col = getValue( scope, key, cassandraConfig.getConsistentReadCL() ); // TODO: why boolean?
         return ( col != null ) ? col.getStringValue() : null;
     }
 
@@ -248,7 +248,7 @@ public class MapSerializationImpl implements MapSerialization {
     @Override
     public UUID getUuid( final MapScope scope, final String key ) {
 
-        Column<Boolean> col = getValue( scope, key );
+        Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
         return ( col != null ) ? col.getUUIDValue() : null;
     }
 
@@ -283,7 +283,7 @@ public class MapSerializationImpl implements MapSerialization {
 
     @Override
     public Long getLong( final MapScope scope, final String key ) {
-        Column<Boolean> col = getValue( scope, key );
+        Column<Boolean> col = getValue( scope, key, cassandraConfig.getReadCL() );
         return ( col != null ) ? col.getLongValue() : null;
     }
 
@@ -355,31 +355,7 @@ public class MapSerializationImpl implements MapSerialization {
     }
 
 
-    private Column<Boolean> getValue( MapScope scope, String key ) {
-
-
-        //add it to the entry
-        final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
-
-        //now get all columns, including the "old row key value"
-        try {
-            final Column<Boolean> result =
-                keyspace.prepareQuery( MAP_ENTRIES ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
-
-            return result;
-        }
-        catch ( NotFoundException nfe ) {
-            //nothing to return
-            return null;
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to connect to cassandra", e );
-        }
-    }
-
-
-    private Column<Boolean> getValueHighConsistency( MapScope scope, String key ) {
-
+    private Column<Boolean> getValue( MapScope scope, String key, final ConsistencyLevel consistencyLevel ) {
 
         //add it to the entry
         final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey( scope, key );
@@ -387,8 +363,7 @@ public class MapSerializationImpl implements MapSerialization {
         //now get all columns, including the "old row key value"
         try {
             final Column<Boolean> result =
-                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getConsistentReadCL() )
-                        .getKey( entryRowKey ).getColumn( true ).execute().getResult();
+                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( consistencyLevel ).getKey( entryRowKey ).getColumn( true ).execute().getResult();
 
             return result;
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e50835f1/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 1bb00dc..bc5f2f1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -438,7 +438,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
                     /**
                      * When a message originates from SNS it has a "Message"  we have to extract
-                     * it and then process it seperately
+                     * it and then process it separately
                      */
 
 
@@ -546,7 +546,7 @@ public class SNSQueueManagerImpl implements QueueManager {
         if ( sqsAsync == null ) {
             logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
             return;
-        }   
+        }
 
         for ( Object body : bodies ) {
             sendMessage( ( Serializable ) body );


[09/18] usergrid git commit: Merge branch 'USERGRID-1048' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1048

Posted by to...@apache.org.
Merge branch 'USERGRID-1048' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1048


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d8e65721
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d8e65721
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d8e65721

Branch: refs/heads/USERGRID-1052
Commit: d8e6572196b1c9245854ce1351d4d2171fbde80b
Parents: 3a7e60b 3ec0f58
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 17:07:31 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 17:07:31 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |   4 +
 .../corepersistence/index/PublishRxTest.java    |  95 ----------------
 .../usergrid/corepersistence/index/RxTest.java  | 108 +++++++++++++++++++
 .../persistence/core/astyanax/CassandraFig.java |   6 +-
 4 files changed, 115 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d8e65721/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------


[02/18] usergrid git commit: Fixes serialization tests and verifies full end to end functionality.

Posted by to...@apache.org.
Fixes serialization tests and verifies full end to end functionality.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/04a3f47b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/04a3f47b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/04a3f47b

Branch: refs/heads/USERGRID-1052
Commit: 04a3f47bd86ec0674ff487f479327e0f174f0425
Parents: 94a9078
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 12:00:56 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 12:00:56 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  70 ++-
 .../asyncevents/model/AsyncEvent.java           |   5 +-
 .../index/IndexProcessorFig.java                |   7 +-
 .../util/ObjectJsonSerializer.java              |  28 +-
 .../persistence/queue/QueueManager.java         |   2 +-
 .../persistence/queue/guice/QueueModule.java    |   1 -
 .../queue/impl/SNSQueueManagerImpl.java         | 515 ++++++++++---------
 .../queue/impl/SQSQueueManagerImpl.java         | 362 -------------
 8 files changed, 336 insertions(+), 654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c9f0953..d319ac8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,19 +29,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import com.google.common.base.Optional;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.exception.NotImplementedException;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
 import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
@@ -50,6 +44,8 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -61,6 +57,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
@@ -78,6 +75,7 @@ import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -94,8 +92,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
 
-    private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer(  );
-
     // SQS maximum receive messages is 10
     private static final int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
@@ -192,6 +188,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
         }
     }
 
+
+    private void offerTopic( final Serializable operation ) {
+        final Timer.Context timer = this.writeTimer.time();
+
+        try {
+            //signal to SQS
+            this.queue.sendMessageToTopic( operation );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+        finally {
+            timer.stop();
+        }
+    }
+
     private void offerBatch(final List operations){
         final Timer.Context timer = this.writeTimer.time();
 
@@ -226,27 +238,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
-    /**
-     * Ack message in SQS
-     */
-    public void ack(final QueueMessage message) {
-
-        final Timer.Context timer = this.ackTimer.time();
-
-        try{
-            queue.commitMessage( message );
-
-            //decrement our in-flight counter
-            inFlight.decrementAndGet();
-
-        }catch(Exception e){
-            throw new RuntimeException("Unable to ack messages", e);
-        }finally {
-            timer.stop();
-        }
-
-
-    }
 
     /**
      * Ack message in SQS
@@ -355,7 +346,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
         IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
             applicationScope );
-        offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+        offerTopic(
+            new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
     }
 
 
@@ -468,7 +460,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
      */
     public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
 
-        final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
 
         final UUID newMessageId = UUIDGenerator.newTimeUUID();
 
@@ -482,12 +474,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
 
         //send to the topic so all regions index the batch
-        try {
-            queue.sendMessageToTopic( elasticsearchIndexEvent );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( "Unable to pulish to topic", e );
-        }
+
+        offerTopic( elasticsearchIndexEvent );
     }
 
     public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
@@ -505,7 +493,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         String highConsistency = null;
 
         if(message == null){
-            logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
 
             highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 
@@ -517,11 +505,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         //our original local read has it, parse it.
         if(message != null){
-             indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
         }
         //we tried to read it at a higher consistency level and it works
         else if (highConsistency != null){
-            indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
         }
 
         //we couldn't find it, bail

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 1af54e3..7c51003 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -30,8 +30,11 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 /**
  * Marker class for serialization
+ *
+ * Note that when you add a subtype, you will need to add it's serialization value below in the JsonSubTypes annotation.
+ *
+ * Each name must be unique, and must map to a subclass that is serialized
  */
-
 @JsonIgnoreProperties( ignoreUnknown = true )
 @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
 @JsonSubTypes( {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 6fd73b4..ec9b315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -105,10 +105,13 @@ public interface IndexProcessorFig extends GuicyFig {
     boolean resolveSynchronously();
 
     /**
-     * Get the message TTL in milliseconds
+     * Get the message TTL in milliseconds.  Defaults to 24 hours
+     *
+     * 24 * 60 * 60 * 1000
+     *
      * @return
      */
-    @Default("604800000")
+    @Default("86400000")
     @Key( "elasticsearch.message.ttl" )
     int getIndexMessageTtl();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
index dbd5ca3..4e5873a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.Serializable;
 
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -33,16 +34,33 @@ import com.google.common.base.Preconditions;
 public final class ObjectJsonSerializer {
 
 
-    private final JsonFactory JSON_FACTORY = new JsonFactory();
+    private static final JsonFactory JSON_FACTORY = new JsonFactory();
 
-    private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+    private static final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+    static{
+
+           /**
+            * Because of the way SNS escapes all our json, we have to tell jackson to accept it.  See the documentation
+            * here for how SNS borks the message body
+            *
+            *  http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+            */
+            MAPPER.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+       }
+
+    /**
+     * Singleton instance of our serializer, instantiating it and configuring the mapper is expensive.
+     */
+    public static final ObjectJsonSerializer INSTANCE = new ObjectJsonSerializer();
+
+
+    private ObjectJsonSerializer( ) {
 
-    public ObjectJsonSerializer( ) {
-        MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
     }
 
 
-    public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+    public <T extends Serializable> String toString( final T toSerialize ) {
 
         Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
         final String stringValue;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index dc3d1b5..34a3654 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,7 @@ public interface QueueManager {
      * @param body
      * @throws IOException
      */
-    <T extends Serializable> void  sendMessage(T body)throws IOException;
+    <T extends Serializable> void sendMessage(T body)throws IOException;
 
     /**
      * Send a messae to the topic to be sent to other queues

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index dd1fe16..caf61bf 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -26,7 +26,6 @@ import org.safehaus.guicyfig.GuicyFigModule;
 import org.apache.usergrid.persistence.queue.QueueFig;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 59ecd24..a3fa05e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -68,6 +68,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
@@ -77,9 +78,10 @@ import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
+
 public class SNSQueueManagerImpl implements QueueManager {
 
-    private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );
 
     private final QueueScope scope;
     private final QueueFig fig;
@@ -91,55 +93,64 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final AmazonSQSAsyncClient sqsAsync;
 
 
-    private final JsonFactory JSON_FACTORY = new JsonFactory();
-    private final ObjectMapper mapper = new ObjectMapper(JSON_FACTORY);
+    private static final JsonFactory JSON_FACTORY = new JsonFactory();
+    private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
+
+    static {
+
+        /**
+         * Because of the way SNS escapes all our json, we have to tell jackson to accept it.  See the documentation
+         * here for how SNS borks the message body
+         *
+         *  http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+         */
+        mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+    }
 
 
-    private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .build(new CacheLoader<String, String>() {
+    private final LoadingCache<String, String> writeTopicArnMap =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() {
             @Override
-            public String load(String queueName)
-                throws Exception {
+            public String load( String queueName ) throws Exception {
 
-                return setupTopics(queueName);
+                return setupTopics( queueName );
             }
-        });
+        } );
 
-    private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .build(new CacheLoader<String, Queue>() {
+    private final LoadingCache<String, Queue> readQueueUrlMap =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() {
             @Override
-            public Queue load(String queueName) throws Exception {
+            public Queue load( String queueName ) throws Exception {
 
                 Queue queue = null;
 
                 try {
-                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
-                    queue = new Queue(result.getQueueUrl());
-                } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                    logger.error("Queue {} does not exist, will create", queueName);
-                } catch (Exception e) {
-                    logger.error("failed to get queue from service", e);
+                    GetQueueUrlResult result = sqs.getQueueUrl( queueName );
+                    queue = new Queue( result.getQueueUrl() );
+                }
+                catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+                    logger.error( "Queue {} does not exist, will create", queueName );
+                }
+                catch ( Exception e ) {
+                    logger.error( "failed to get queue from service", e );
                     throw e;
                 }
 
-                if (queue == null) {
-                    String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
-                    queue = new Queue(url);
+                if ( queue == null ) {
+                    String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+                    queue = new Queue( url );
                 }
 
-                setupTopics(queueName);
+                setupTopics( queueName );
 
                 return queue;
             }
-        });
-
+        } );
 
 
     @Inject
-    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
-                               CassandraFig cassandraFig, QueueFig queueFig) {
+    public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
+                                CassandraFig cassandraFig, QueueFig queueFig ) {
         this.scope = scope;
         this.fig = fig;
         this.clusterFig = clusterFig;
@@ -148,177 +159,179 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
         final ExecutorService executor = TaskExecutorFactory
-            .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
-                TaskExecutorFactory.RejectionAction.CALLERRUNS);
+            .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+                TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
 
         final Region region = getRegion();
 
         try {
-            sqs = createSQSClient(region);
-            sns = createSNSClient(region);
-            snsAsync = createAsyncSNSClient(region, executor);
+            sqs = createSQSClient( region );
+            sns = createSNSClient( region );
+            snsAsync = createAsyncSNSClient( region, executor );
             sqsAsync = createAsyncSQSClient( region, executor );
-
-        } catch (Exception e) {
-            throw new RuntimeException("Error setting up mapper", e);
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Error setting up mapper", e );
         }
     }
 
-    private String setupTopics(final String queueName)
-        throws Exception {
 
-        logger.info("Setting up setupTopics SNS/SQS...");
+    private String setupTopics( final String queueName ) throws Exception {
 
-        String primaryTopicArn = AmazonNotificationUtils.getTopicArn(sns, queueName, true);
+        logger.info( "Setting up setupTopics SNS/SQS..." );
 
-        if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn);
+        String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );
 
-        String queueUrl = AmazonNotificationUtils.getQueueUrlByName(sqs, queueName);
-        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(sqs, queueName);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn );
+        }
+
+        String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
+        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );
 
-        if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn );
+        }
 
-        if (primaryQueueArn == null) {
-            if (logger.isDebugEnabled())
-                logger.debug("SNS/SQS Setup: primaryQueueArn is null, creating queue...");
+        if ( primaryQueueArn == null ) {
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
+            }
 
-            queueUrl = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
-            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(sqs, queueUrl);
+            queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );
 
-            if (logger.isDebugEnabled())
-                logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
+            }
         }
 
         try {
 
-            SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
-             sns.subscribe(primarySubscribeRequest);
+            SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
+            sns.subscribe( primarySubscribeRequest );
 
             // ensure the SNS primary topic has permission to send to the primary SQS queue
             List<String> primaryTopicArnList = new ArrayList<>();
-            primaryTopicArnList.add(primaryTopicArn);
-            AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList);
-        } catch (AmazonServiceException e) {
-            logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
+            primaryTopicArnList.add( primaryTopicArn );
+            AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
+        }
+        catch ( AmazonServiceException e ) {
+            logger.error(
+                String.format( "Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn ), e );
         }
 
-        if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL) {
+        if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) {
 
             String multiRegion = fig.getRegionList();
 
-            if (logger.isDebugEnabled())
-                logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
+            }
 
-            String[] regionNames = multiRegion.split(",");
+            String[] regionNames = multiRegion.split( "," );
 
-            final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
-            final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
+            final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
+            final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
 
-            arrQueueArns.put(primaryQueueArn, fig.getRegion());
-            topicArns.put(primaryTopicArn, fig.getRegion());
+            arrQueueArns.put( primaryQueueArn, fig.getRegion() );
+            topicArns.put( primaryTopicArn, fig.getRegion() );
 
-            for (String regionName : regionNames) {
+            for ( String regionName : regionNames ) {
 
                 regionName = regionName.trim();
-                Regions regions = Regions.fromName(regionName);
-                Region region = Region.getRegion(regions);
+                Regions regions = Regions.fromName( regionName );
+                Region region = Region.getRegion( regions );
 
-                AmazonSQSClient sqsClient = createSQSClient(region);
-                AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+                AmazonSQSClient sqsClient = createSQSClient( region );
+                AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
 
                 // getTopicArn will create the SNS topic if it doesn't exist
-                String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
-                topicArns.put(topicArn, regionName);
+                String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
+                topicArns.put( topicArn, regionName );
 
                 // create the SQS queue if it doesn't exist
-                String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
-                if (queueArn == null) {
-                    queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
-                    queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
+                String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
+                if ( queueArn == null ) {
+                    queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
+                    queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
                 }
 
-                arrQueueArns.put(queueArn, regionName);
+                arrQueueArns.put( queueArn, regionName );
             }
 
-            logger.debug("Creating Subscriptions...");
+            logger.debug( "Creating Subscriptions..." );
 
-            for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
+            for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
                 String queueARN = queueArnEntry.getKey();
                 String strSqsRegion = queueArnEntry.getValue();
 
-                Regions sqsRegions = Regions.fromName(strSqsRegion);
-                Region sqsRegion = Region.getRegion(sqsRegions);
+                Regions sqsRegions = Regions.fromName( strSqsRegion );
+                Region sqsRegion = Region.getRegion( sqsRegions );
 
-                AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion);
+                AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );
 
                 // ensure the URL used to subscribe is for the correct name/region
-                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName);
+                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );
 
                 // this list used later for adding permissions to queues
                 List<String> topicArnList = new ArrayList<>();
 
-                for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
+                for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {
 
                     String topicARN = topicArnEntry.getKey();
-                    topicArnList.add(topicARN);
+                    topicArnList.add( topicARN );
 
                     String strSnsRegion = topicArnEntry.getValue();
-                    Regions snsRegions = Regions.fromName(strSnsRegion);
-                    Region snsRegion = Region.getRegion(snsRegions);
+                    Regions snsRegions = Regions.fromName( strSnsRegion );
+                    Region snsRegion = Region.getRegion( snsRegions );
 
-                    AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously
-                    SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN);
+                    AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
+                    SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );
 
                     try {
 
-                        logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
-                            queueARN,
-                            strSqsRegion,
-                            topicARN,
-                            strSnsRegion
-                        );
+                        logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
+                            strSqsRegion, topicARN, strSnsRegion );
 
-                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest);
+                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
                         String subscriptionARN = subscribeResult.getSubscriptionArn();
-                        if(logger.isDebugEnabled()){
-                            logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN);
+                        if ( logger.isDebugEnabled() ) {
+                            logger.debug(
+                                "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
+                                queueARN, topicARN, subscriptionARN );
                         }
-
-
-                    } catch (Exception e) {
-                        logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
-                            queueARN,
-                            strSqsRegion,
-                            topicARN,
-                            strSnsRegion), e);
-
-
+                    }
+                    catch ( Exception e ) {
+                        logger.error( String
+                            .format( "ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
+                                queueARN, strSqsRegion, topicARN, strSnsRegion ), e );
                     }
                 }
 
-                logger.info("Adding permission to receive messages...");
+                logger.info( "Adding permission to receive messages..." );
                 // add permission to each queue, providing a list of topics that it's subscribed to
-                AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList);
-
+                AmazonNotificationUtils
+                    .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
             }
         }
 
         return primaryTopicArn;
     }
 
+
     /**
      * The Asynchronous SNS client is used for publishing events to SNS.
-     *
      */
 
-    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
+    private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
 
-        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
+        final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor );
 
-        sns.setRegion(region);
+        sns.setRegion( region );
 
         return sns;
     }
@@ -326,11 +339,8 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     /**
      * Create the async sqs client
-     * @param region
-     * @param executor
-     * @return
      */
-    private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+    private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
         final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
@@ -338,173 +348,209 @@ public class SNSQueueManagerImpl implements QueueManager {
         sqs.setRegion( region );
 
         return sqs;
-
     }
 
+
     /**
      * The Synchronous SNS client is used for creating topics and subscribing queues.
-     *
      */
-    private AmazonSNSClient createSNSClient(final Region region) {
+    private AmazonSNSClient createSNSClient( final Region region ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
-        final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+        final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() );
 
-        sns.setRegion(region);
+        sns.setRegion( region );
 
         return sns;
     }
 
 
     private String getName() {
-        String name = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
+        String name =
+            clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+                + scope.getRegionImplementation();
         name = name.toLowerCase(); //user lower case values
-        Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
+        Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
 
         return name;
     }
 
+
     public Queue getReadQueue() {
         String queueName = getName();
 
         try {
-            return readQueueUrlMap.get(queueName);
-        } catch (ExecutionException ee) {
-            throw new RuntimeException(ee);
+            return readQueueUrlMap.get( queueName );
+        }
+        catch ( ExecutionException ee ) {
+            throw new RuntimeException( ee );
         }
     }
 
+
     public String getWriteTopicArn() {
         try {
-            return writeTopicArnMap.get(getName());
-
-        } catch (ExecutionException ee) {
-            throw new RuntimeException(ee);
+            return writeTopicArnMap.get( getName() );
+        }
+        catch ( ExecutionException ee ) {
+            throw new RuntimeException( ee );
         }
     }
 
+
     @Override
-    public rx.Observable<QueueMessage> getMessages(final int limit,
-                                                   final int transactionTimeout,
-                                                   final int waitTime,
-                                                   final Class klass) {
+    public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+                                                    final Class klass ) {
 
-        if (sqs == null) {
-            logger.error("SQS is null - was not initialized properly");
+        if ( sqs == null ) {
+            logger.error( "SQS is null - was not initialized properly" );
             return rx.Observable.empty();
         }
 
         String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Getting up to {} messages from {}", limit, url );
+        }
 
-        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
-        receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(Math.max(1, transactionTimeout / 1000));
-        receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
+        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
+        receiveMessageRequest.setMaxNumberOfMessages( limit );
+        receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) );
+        receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 );
 
         try {
-            ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+            ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );
             List<Message> messages = result.getMessages();
 
-            if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Received {} messages from {}", messages.size(), url );
+            }
+
+            List<QueueMessage> queueMessages = new ArrayList<>( messages.size() );
 
-            List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+            for ( Message message : messages ) {
 
-            for (Message message : messages) {
-                Object body;
+                Object payload;
                 final String originalBody = message.getBody();
 
                 try {
-                    final JsonNode bodyNode =  mapper.readTree(message.getBody());
-                    JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
+                    final JsonNode bodyNode = mapper.readTree( message.getBody() );
 
+                    /**
+                     * When a message originates from SNS it has a "Message"  we have to extract
+                     * it and then process it seperately
+                     */
 
 
-                    final String bodyText = mapper.writeValueAsString( bodyObj );;
+                    if ( bodyNode.has( "Message" ) ) {
+                        final String snsNode = bodyNode.get( "Message" ).asText();
 
-                    body = fromString(bodyText, klass);
-                } catch (Exception e) {
-                    logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
-                    throw new RuntimeException(e);
+                        payload = deSerializeSQSMessage( snsNode, klass );
+                    }
+                    else {
+                        payload = deSerializeSQSMessage( originalBody, klass );
+                    }
+                }
+                catch ( Exception e ) {
+                    logger.error( String.format( "failed to deserialize message: %s", message.getBody() ), e );
+                    throw new RuntimeException( e );
                 }
 
-                QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
-                queueMessage.setStringBody(originalBody);
-                queueMessages.add(queueMessage);
+                QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
+                    message.getAttributes().get( "type" ) );
+                queueMessage.setStringBody( originalBody );
+                queueMessages.add( queueMessage );
             }
 
-            return rx.Observable.from(queueMessages);
-
-        } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) {
-            logger.error(String.format("Queue does not exist! [%s]", url), dne);
-        } catch (Exception e) {
-            logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e);
+            return rx.Observable.from( queueMessages );
+        }
+        catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
+            logger.error( String.format( "Queue does not exist! [%s]", url ), dne );
+        }
+        catch ( Exception e ) {
+            logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e );
         }
 
-        return rx.Observable.from(new ArrayList<>(0));
+        return rx.Observable.from( new ArrayList<>( 0 ) );
     }
 
+
+    /**
+     * Take a string, possibly escaped via SNS, and run it through our mapper to create an object)
+     */
+    private Object deSerializeSQSMessage( final String message, final Class type ) {
+        try {
+            final Object o = mapper.readValue( message, type );
+            return o;
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e );
+        }
+    }
+
+
     @Override
     public long getQueueDepth() {
         String key = "ApproximateNumberOfMessages";
         try {
-            GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key));
-            String depthString = result.getAttributes().get(key);
-            return depthString != null ? Long.parseLong(depthString) : 0;
-        }catch (Exception e){
-            logger.error("Exception getting queue depth",e);
+            GetQueueAttributesResult result =
+                sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) );
+            String depthString = result.getAttributes().get( key );
+            return depthString != null ? Long.parseLong( depthString ) : 0;
+        }
+        catch ( Exception e ) {
+            logger.error( "Exception getting queue depth", e );
             return -1;
-
         }
     }
 
 
     @Override
     public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
-        if (snsAsync == null) {
-                   logger.error("SNS client is null, perhaps it failed to initialize successfully");
-                   return;
-               }
-
-               final String stringBody = toString(body);
+        if ( snsAsync == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+            return;
+        }
 
-               String topicArn = getWriteTopicArn();
+        final String stringBody = toString( body );
 
-               if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+        String topicArn = getWriteTopicArn();
 
-               PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Publishing Message...{} to arn: {}", stringBody, topicArn );
+        }
 
-               snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
-                   @Override
-                   public void onError( Exception e ) {
-                       logger.error( "Error publishing message... {}", e );
-                   }
+        PublishRequest publishRequest = new PublishRequest( topicArn, stringBody );
 
+        snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+            @Override
+            public void onError( Exception e ) {
+                logger.error( "Error publishing message... {}", e );
+            }
 
-                   @Override
-                   public void onSuccess( PublishRequest request, PublishResult result ) {
-                       if ( logger.isDebugEnabled() ) logger
-                           .debug( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
-                               request.getTopicArn() );
-                   }
-               } );
 
+            @Override
+            public void onSuccess( PublishRequest request, PublishResult result ) {
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug( "Successfully published... messageID=[{}],  arn=[{}]", result.getMessageId(),
+                        request.getTopicArn() );
+                }
+            }
+        } );
     }
 
 
     @Override
-    public void sendMessages(final List bodies) throws IOException {
+    public void sendMessages( final List bodies ) throws IOException {
 
-        if (snsAsync == null) {
-            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+        if ( snsAsync == null ) {
+            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
             return;
         }
 
-        for (Object body : bodies) {
-            sendMessage((Serializable)body);
+        for ( Object body : bodies ) {
+            sendMessage( ( Serializable ) body );
         }
-
     }
 
 
@@ -545,94 +591,81 @@ public class SNSQueueManagerImpl implements QueueManager {
         } );
     }
 
+
     @Override
     public void deleteQueue() {
-        logger.warn("Deleting queue: "+getReadQueue().getUrl());
-        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
-        logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
-        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
-
+        logger.warn( "Deleting queue: " + getReadQueue().getUrl() );
+        sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) );
+        logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" );
+        sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) );
     }
 
 
     @Override
-    public void commitMessage(final QueueMessage queueMessage) {
+    public void commitMessage( final QueueMessage queueMessage ) {
         String url = getReadQueue().getUrl();
-        if (logger.isDebugEnabled())
-            logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
+        }
 
-        sqs.deleteMessage(new DeleteMessageRequest()
-            .withQueueUrl(url)
-            .withReceiptHandle(queueMessage.getHandle()));
+        sqs.deleteMessage(
+            new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) );
     }
 
 
     @Override
-    public void commitMessages(final List<QueueMessage> queueMessages) {
+    public void commitMessages( final List<QueueMessage> queueMessages ) {
         String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Commit messages {} to queue {}", queueMessages.size(), url );
+        }
 
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
 
-        for (QueueMessage message : queueMessages) {
-            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
+        for ( QueueMessage message : queueMessages ) {
+            entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
         }
 
-        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
-        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries );
+        DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
 
         boolean successful = result.getFailed().size() <= 0;
 
-        if (!successful) {
-            for (BatchResultErrorEntry failed : result.getFailed()) {
-                logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
+        if ( !successful ) {
+            for ( BatchResultErrorEntry failed : result.getFailed() ) {
+                logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() );
             }
         }
     }
 
 
     /**
-     * Read the object from Base64 string.
-     */
-
-    private Object fromString(final String s, final Class klass)
-        throws IOException, ClassNotFoundException {
-
-        Object o = mapper.readValue(s, klass);
-        return o;
-    }
-
-    /**
      * Write the object to a Base64 string.
      */
-    private String toString(final Object o) throws IOException {
-        return mapper.writeValueAsString(o);
+    private String toString( final Object o ) throws IOException {
+        return mapper.writeValueAsString( o );
     }
 
 
     /**
      * Get the region
-     *
-     * @return
      */
     private Region getRegion() {
-        Regions regions = Regions.fromName(fig.getRegion());
-        return Region.getRegion(regions);
+        Regions regions = Regions.fromName( fig.getRegion() );
+        return Region.getRegion( regions );
     }
 
 
     /**
      * Create the SQS client for the specified settings
      */
-    private AmazonSQSClient createSQSClient(final Region region) {
+    private AmazonSQSClient createSQSClient( final Region region ) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
+        final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
 
-        sqs.setRegion(region);
+        sqs.setRegion( region );
 
         return sqs;
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
deleted file mode 100644
index 0c56c05..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-public class SQSQueueManagerImpl implements QueueManager {
-    private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
-
-
-    private final QueueScope scope;
-    private ObjectMapper mapper;
-    protected final QueueFig fig;
-    private final ClusterFig clusterFig;
-    protected final AmazonSQSClient sqs;
-
-    private static SmileFactory smileFactory = new SmileFactory();
-
-    private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
-        .maximumSize(1000)
-        .build(new CacheLoader<String, Queue>() {
-            @Override
-            public Queue load(String queueName) throws Exception {
-
-                //the amazon client is not thread safe, we need to create one per queue
-                Queue queue = null;
-
-                try {
-
-                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
-                    queue = new Queue(result.getQueueUrl());
-
-                } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                    //no op, swallow
-                    logger.error("Queue {} does not exist, creating", queueName);
-
-                } catch (Exception e) {
-                    logger.error("failed to get queue from service", e);
-                    throw e;
-                }
-
-                if (queue == null) {
-
-                    final String deadletterQueueName = String.format("%s_dead", queueName);
-                    final Map<String, String> deadLetterAttributes = new HashMap<>(2);
-
-                    deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
-                    CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
-                        .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
-
-                    final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
-                    logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
-
-                    final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
-
-                    String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
-                        " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
-
-                    final Map<String, String> queueAttributes = new HashMap<>(2);
-                    deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
-                    deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
-
-                    CreateQueueRequest createQueueRequest = new CreateQueueRequest().
-                        withQueueName(queueName)
-                        .withAttributes(queueAttributes);
-
-                    CreateQueueResult result = sqs.createQueue(createQueueRequest);
-
-                    String url = result.getQueueUrl();
-                    queue = new Queue(url);
-
-                    logger.info("Created queue with url {}", url);
-                }
-
-                return queue;
-            }
-        });
-
-
-    @Inject
-    public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig, final ClusterFig clusterFig) {
-
-        this.scope = scope;
-        this.fig = fig;
-        this.clusterFig = clusterFig;
-        try {
-
-            smileFactory.delegateToTextual(true);
-            mapper = new ObjectMapper(smileFactory);
-            //pretty print, disabling for speed
-//            mapper.enable(SerializationFeature.INDENT_OUTPUT);
-            mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
-            sqs = createClient();
-
-        } catch (Exception e) {
-            throw new RuntimeException("Error setting up mapper", e);
-        }
-    }
-
-
-    protected String getName() {
-
-        String name = clusterFig.getClusterName() + "_" + scope.getName();
-
-        Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
-
-        return name;
-    }
-
-    public Queue getQueue() {
-
-        try {
-            Queue queue = urlMap.get(getName());
-            return queue;
-        } catch (ExecutionException ee) {
-            throw new RuntimeException(ee);
-        }
-    }
-
-    @Override
-    public rx.Observable<QueueMessage> getMessages(final int limit,
-                                          final int transactionTimeout,
-                                          final int waitTime,
-                                          final Class klass) {
-
-        if (sqs == null) {
-            logger.error("Sqs is null");
-            return rx.Observable.empty();
-        }
-
-        String url = getQueue().getUrl();
-
-        if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url);
-
-        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
-        receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
-        receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
-        ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
-        List<Message> messages = result.getMessages();
-
-        if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
-
-        List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
-
-        for (Message message : messages) {
-            Object body;
-
-            try {
-                body = fromString(message.getBody(), klass);
-            } catch (Exception e) {
-                logger.error("failed to deserialize message", e);
-                throw new RuntimeException(e);
-            }
-
-            QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
-            queueMessage.setStringBody(message.getBody());
-            queueMessages.add(queueMessage);
-        }
-
-        return rx.Observable.from(queueMessages);
-    }
-
-    @Override
-    public long getQueueDepth() {
-        String key = "ApproximateNumberOfMessages";
-        try {
-            GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
-            String depthString = result.getAttributes().get(key);
-            return depthString != null ? Long.parseLong(depthString) : 0;
-        }catch (Exception e){
-            logger.error("Exception getting queue depth",e);
-            return -1;
-
-        }
-    }
-    @Override
-    public void sendMessages(final List bodies) throws IOException {
-
-        if (sqs == null) {
-            logger.error("Sqs is null");
-            return;
-        }
-        String url = getQueue().getUrl();
-
-        if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url);
-
-        SendMessageBatchRequest request = new SendMessageBatchRequest(url);
-        List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-
-        for (Object body : bodies) {
-            SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
-            entry.setId(UUID.randomUUID().toString());
-            entry.setMessageBody(toString(body));
-            entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype"));
-            entries.add(entry);
-        }
-
-        request.setEntries(entries);
-        sqs.sendMessageBatch(request);
-
-    }
-
-
-    @Override
-    public <T extends Serializable> void sendMessage( final T body ) throws IOException {
-
-        if (sqs == null) {
-              logger.error("Sqs is null");
-              return;
-          }
-
-          String url = getQueue().getUrl();
-
-          if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
-
-          final String stringBody = toString(body);
-
-          SendMessageRequest request = new SendMessageRequest(url, stringBody);
-          sqs.sendMessage(request);
-      }
-
-
-
-    @Override
-    public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
-        sendMessage( body );
-    }
-
-
-
-    @Override
-    public void commitMessage(final QueueMessage queueMessage) {
-
-        String url = getQueue().getUrl();
-        if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
-
-        sqs.deleteMessage(new DeleteMessageRequest()
-            .withQueueUrl(url)
-            .withReceiptHandle(queueMessage.getHandle()));
-    }
-
-
-    @Override
-    public void commitMessages(final List<QueueMessage> queueMessages) {
-
-        String url = getQueue().getUrl();
-        if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
-
-        List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
-
-        for (QueueMessage message : queueMessages) {
-            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
-        }
-
-        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
-        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
-
-        boolean successful = result.getFailed().size() <= 0;
-
-        if (!successful) {
-
-            for (BatchResultErrorEntry failed : result.getFailed()) {
-                logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
-            }
-        }
-    }
-
-
-    /**
-     * Read the object from Base64 string.
-     */
-    private Object fromString(final String s,
-                              final Class klass) throws IOException, ClassNotFoundException {
-        Object o = mapper.readValue(s, klass);
-        return o;
-    }
-
-    /**
-     * Write the object to a Base64 string.
-     */
-    protected String toString(final Object o) throws IOException {
-        return mapper.writeValueAsString(o);
-    }
-
-
-    /**
-     * Get the region
-     *
-     * @return
-     */
-    protected Region getRegion() {
-        Regions regions = Regions.fromName(fig.getRegion());
-        Region region = Region.getRegion(regions);
-        return region;
-    }
-
-    @Override
-    public void deleteQueue() {
-        logger.warn("Deleting queue: "+getQueue().getUrl());
-        sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
-    }
-
-
-
-    /**
-     * Create the SQS client for the specified settings
-     */
-    private AmazonSQSClient createClient() {
-        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
-        final Region region = getRegion();
-        sqs.setRegion(region);
-
-        return sqs;
-    }
-
-
-}


[13/18] usergrid git commit: Merge commit 'refs/pull/407/head' of apache.github.com:apache/usergrid into 2.1-release

Posted by to...@apache.org.
Merge commit 'refs/pull/407/head' of apache.github.com:apache/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0604247f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0604247f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0604247f

Branch: refs/heads/USERGRID-1052
Commit: 0604247f690db09b1a25feef9532cb5e58929437
Parents: 1fe1d1a 48161a1
Author: Mike Dunker <md...@apigee.com>
Authored: Thu Oct 22 11:27:59 2015 -0700
Committer: Mike Dunker <md...@apigee.com>
Committed: Thu Oct 22 11:27:59 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java |   3 +-
 .../persistence/core/astyanax/CassandraFig.java |   2 +-
 .../core/astyanax/MultiRowColumnIterator.java   |  46 +--
 .../usergrid/persistence/graph/GraphFig.java    |   2 +-
 .../persistence/graph/guice/GraphModule.java    |  11 +-
 .../impl/EdgeSerializationImpl.java             |  17 +-
 .../impl/shard/AsyncTaskExecutor.java           |  34 ++
 .../graph/serialization/impl/shard/Shard.java   |  15 +
 .../impl/shard/ShardEntryGroup.java             |  13 +-
 .../impl/shard/ShardGroupCompaction.java        |   4 -
 .../impl/shard/ShardGroupDeletion.java          |  78 ++++
 .../impl/shard/impl/AsyncTaskExecutorImpl.java  |  53 +++
 .../shard/impl/NodeShardAllocationImpl.java     |  81 ++--
 .../shard/impl/ShardGroupColumnIterator.java    |  72 ++--
 .../shard/impl/ShardGroupCompactionImpl.java    |  10 +-
 .../impl/shard/impl/ShardGroupDeletionImpl.java | 219 +++++++++++
 .../impl/shard/impl/ShardsColumnIterator.java   |  10 +
 .../graph/GraphManagerShardConsistencyIT.java   | 378 ++++++++++++++-----
 .../impl/shard/ShardEntryGroupTest.java         |  14 +
 .../impl/shard/ShardGroupCompactionTest.java    |  30 +-
 .../shard/impl/ShardGroupDeletionImplTest.java  | 341 +++++++++++++++++
 .../index/impl/EsEntityIndexImpl.java           |  45 ++-
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../rest/applications/ApplicationResource.java  |  50 +++
 .../cassandra/ManagementServiceImpl.java        |   3 +-
 25 files changed, 1307 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0604247f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------


[14/18] usergrid git commit: Added trace statements to the shard group deletion

Posted by to...@apache.org.
Added trace statements to the shard group deletion


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/db852d0a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/db852d0a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/db852d0a

Branch: refs/heads/USERGRID-1052
Commit: db852d0a1d00f8ea89eddb82d4075b1b774d9673
Parents: 0604247
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 22 14:14:43 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 22 14:14:43 2015 -0600

----------------------------------------------------------------------
 .../impl/shard/impl/ShardGroupDeletionImpl.java          | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/db852d0a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 38a7834..6d2a009 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -125,10 +125,13 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
                                                    final ShardEntryGroup shardEntryGroup,
                                                    final Iterator<MarkedEdge> edgeIterator ) {
 
+        logger.trace( "Beginning audit of shard group {}", shardEntryGroup );
+
         /**
          * Compaction is pending, we cannot check it
          */
         if ( shardEntryGroup.isCompactionPending() ) {
+            logger.trace( "Shard group {} is compacting, not auditing group", shardEntryGroup );
             return DeleteResult.COMPACTION_PENDING;
         }
 
@@ -136,6 +139,7 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
         final long currentTime = timeService.getCurrentTime();
 
         if ( shardEntryGroup.isNew( currentTime ) ) {
+            logger.trace( "Shard group {} contains a shard that is is too new, not auditing group", shardEntryGroup );
             return DeleteResult.TOO_NEW;
         }
 
@@ -143,6 +147,8 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
          * We have edges, and therefore cannot delete them
          */
         if ( edgeIterator.hasNext() ) {
+            logger.trace( "Shard group {} has edges, not deleting", shardEntryGroup );
+
             return DeleteResult.CONTAINS_EDGES;
         }
 
@@ -157,6 +163,7 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
 
             //skip the min shard
             if(shard.isMinShard()){
+                logger.trace( "Shard {} in group {} is the minimum, not deleting", shard, shardEntryGroup );
                 continue;
             }
 
@@ -173,6 +180,8 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
             }
 
             result = DeleteResult.DELETED;
+
+            logger.trace( "Removing shard {} in group {}", shard, shardEntryGroup );
         }
 
 
@@ -187,6 +196,8 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
            }
        }
 
+        logger.trace( "Completed auditing shard group {}", shardEntryGroup );
+
         return result;
     }
 


[08/18] usergrid git commit: Add sourceRegion to ElasticsearchIndexEvent, fix logging statements, update sqs/sns client null checks.

Posted by to...@apache.org.
Add sourceRegion to ElasticsearchIndexEvent, fix logging statements, update sqs/sns client null checks.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3a7e60b3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3a7e60b3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3a7e60b3

Branch: refs/heads/USERGRID-1052
Commit: 3a7e60b3131e207890354ca5fa84258795296372
Parents: 19d30ea
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Oct 19 17:07:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Oct 19 17:07:18 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java                | 9 ++++++---
 .../asyncevents/model/ElasticsearchIndexEvent.java          | 3 ++-
 .../persistence/queue/impl/SNSQueueManagerImpl.java         | 9 ++-------
 3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 67d0dab..2b583b5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -498,7 +498,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         //now queue up the index message
 
-        final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+        final ElasticsearchIndexEvent elasticsearchIndexEvent =
+            new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
 
         //send to the topic so all regions index the batch
 
@@ -520,12 +521,14 @@ public class AmazonAsyncEventService implements AsyncEventService {
         final IndexOperationMessage indexOperationMessage;
 
         if(message == null){
-            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
+            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level",
+                messageId);
 
             final String highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 
             if(highConsistency == null){
-                logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+                logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level",
+                    messageId);
 
                 throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
index 207b15e..049c3a5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/ElasticsearchIndexEvent.java
@@ -35,7 +35,8 @@ public final class ElasticsearchIndexEvent extends AsyncEvent {
     public ElasticsearchIndexEvent() {
     }
 
-    public ElasticsearchIndexEvent(  UUID indexBatchId ) {
+    public ElasticsearchIndexEvent(String sourceRegion, UUID indexBatchId) {
+        super(sourceRegion);
         this.indexBatchId = indexBatchId;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3a7e60b3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 58b2a4d..3c18992 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -543,11 +543,6 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public void sendMessages( final List bodies ) throws IOException {
 
-        if ( snsAsync == null ) {
-            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
-            return;
-        }
-
         for ( Object body : bodies ) {
             sendMessage( ( Serializable ) body );
         }
@@ -557,8 +552,8 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public <T extends Serializable> void sendMessage( final T body ) throws IOException {
 
-        if ( snsAsync == null ) {
-            logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+        if ( sqsAsync == null ) {
+            logger.error( "SQS client is null, perhaps it failed to initialize successfully" );
             return;
         }
 


[15/18] usergrid git commit: Removed unused class and added logging

Posted by to...@apache.org.
Removed unused class and added logging


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b4faf72
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b4faf72
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b4faf72

Branch: refs/heads/USERGRID-1052
Commit: 8b4faf72221ec9eb2b8bb59cca0263c20b483ccb
Parents: db852d0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Oct 23 09:58:45 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Oct 23 09:58:45 2015 -0600

----------------------------------------------------------------------
 .../read/traverse/AbstractReadGraphFilter.java  | 10 ++-
 .../read/traverse/EntityLoadVerifyFilter.java   | 24 +++---
 .../persistence/ObservableIterator.java         | 83 --------------------
 .../persistence/core/rx/ObservableIterator.java |  2 +-
 4 files changed, 24 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index f477092..d3e0345 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -20,6 +20,9 @@
 package org.apache.usergrid.corepersistence.pipeline.read.traverse;
 
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
 import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
@@ -41,6 +44,8 @@ import rx.Observable;
  */
 public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> {
 
+    private static final Logger logger = LoggerFactory.getLogger( AbstractReadGraphFilter.class );
+
     private final GraphManagerFactory graphManagerFactory;
 
 
@@ -82,7 +87,10 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
              */
             return graphManager.loadEdgesFromSource( search )
                 //set the edge state for cursors
-                .doOnNext( edge -> edgeCursorState.update( edge ) )
+                .doOnNext( edge -> {
+                    logger.trace( "Seeking over edge {}", edge );
+                    edgeCursorState.update( edge );
+                } )
 
                     //map our id from the target edge  and set our cursor every edge we traverse
                 .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
index 41507e9..c782bce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/EntityLoadVerifyFilter.java
@@ -47,7 +47,9 @@ import rx.Observable;
  *
  * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
  */
-public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>>{
+public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, FilterResult<Entity>> {
+
+    private static final Logger logger = LoggerFactory.getLogger( EntityLoadVerifyFilter.class );
 
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
 
@@ -69,17 +71,19 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil
         final Observable<FilterResult<Entity>> entityObservable =
             filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
 
-                    final Observable<EntitySet> entitySetObservable =
-                        Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
-                                  .flatMap( ids -> entityCollectionManager.load( ids ) );
+                logger.trace( "Attempting to batch load ids {}", bufferedIds );
+
+                final Observable<EntitySet> entitySetObservable =
+                    Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
+                              .flatMap( ids -> entityCollectionManager.load( ids ) );
 
 
-                    //now we have a collection, validate our canidate set is correct.
+                //now we have a collection, validate our canidate set is correct.
 
-                    return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
-                                              .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
-                            entityCollector -> Observable.from( entityCollector.getResults() ) );
-                } );
+                return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
+                                          .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+                        entityCollector -> Observable.from( entityCollector.getResults() ) );
+            } );
 
         return entityObservable;
     }
@@ -132,7 +136,7 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil
             //doesn't exist warn and drop
             if ( entity == null || !entity.getEntity().isPresent() ) {
                 logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
-                        + "  Ignoring since this could be a region sync issue", candidateId );
+                    + "  Ignoring since this could be a region sync issue", candidateId );
 
 
                 //TODO trigger an audit after a fail count where we explicitly try to repair from other regions

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
deleted file mode 100644
index 9befb79..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/ObservableIterator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Subscriber;
-
-import java.util.Iterator;
-
-
-/**
- * Converts an iterator to an observable.  Subclasses need to only implement getting the iterator from the data source.
- * This is used in favor of "Observable.just" when the initial fetch of the iterator will require I/O.  This allows us
- * to wrap the iterator in a deferred invocation to avoid the blocking on construction.
- */
-public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T> {
-
-    private static final Logger log = LoggerFactory.getLogger(ObservableIterator.class);
-
-    private final String name;
-
-
-    /**
-     * @param name The simple name of the iterator, used for debugging
-     */
-    protected ObservableIterator( final String name ) {this.name = name;}
-
-
-    @Override
-    public void call( final Subscriber<? super T> subscriber ) {
-
-
-        try {
-            //get our iterator and push data to the observer
-            final Iterator<T> itr = getIterator();
-
-            Preconditions.checkNotNull(itr,
-                    "The observable must return an iterator.  Null was returned for iterator " + name);
-
-
-            //while we have items to emit and our subscriber is subscribed, we want to keep emitting items
-            while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
-                final T next = itr.next();
-
-                log.trace( "Iterator '{}' emitting item '{}'", name, next );
-
-                subscriber.onNext( next );
-            }
-
-
-            subscriber.onCompleted();
-        }
-
-        //if any error occurs, we need to notify the observer so it can perform it's own error handling
-        catch ( Throwable t ) {
-            log.error( "Unable to emit items from iterator {}", name, t );
-            subscriber.onError( t );
-        }
-    }
-
-
-    /**
-     * Return the iterator to feed data to
-     */
-    protected abstract Iterator<T> getIterator();
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b4faf72/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 84a7fc3..57409e1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -66,7 +66,7 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
             while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
                 final T next = itr.next();
 
-//                log.trace( "Iterator '{}' emitting item '{}'", name, next );
+                log.trace( "Iterator '{}' emitting item '{}'", name, next );
 
                 subscriber.onNext( next );
             }


[07/18] usergrid git commit: Updates the defaults to be more sensible in a multi-region environment

Posted by to...@apache.org.
Updates the defaults to be more sensible in a multi-region environment


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3ec0f588
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3ec0f588
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3ec0f588

Branch: refs/heads/USERGRID-1052
Commit: 3ec0f5886b82737d4a7ed64fae01afbdb6707763
Parents: 4013f17
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 17:44:39 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 17:44:39 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/index/PublishRxTest.java    |  95 ----------------
 .../usergrid/corepersistence/index/RxTest.java  | 108 +++++++++++++++++++
 .../persistence/core/astyanax/CassandraFig.java |   6 +-
 3 files changed, 111 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
deleted file mode 100644
index 973a42d..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/PublishRxTest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import rx.Observable;
-import rx.Subscription;
-import rx.observables.ConnectableObservable;
-import rx.schedulers.Schedulers;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- * Test to test some assumptions about RX behaviors
- */
-public class PublishRxTest {
-
-    @Test
-    public void testPublish() throws InterruptedException {
-
-        final int count = 10;
-
-        final CountDownLatch latch = new CountDownLatch( count );
-
-        final Subscription connectedObservable =
-            Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
-                      .subscribe();
-
-
-        final boolean completed = latch.await( 5, TimeUnit.SECONDS );
-
-        assertTrue( "publish1 behaves as expected", completed );
-
-        final boolean completedSubscription = connectedObservable.isUnsubscribed();
-
-        assertTrue( "Subscription complete", completedSubscription );
-    }
-
-
-    @Test
-    @Ignore("This seems like it should work, yet blocks forever")
-    public void testConnectableObserver() throws InterruptedException {
-
-        final int count = 10;
-
-        final CountDownLatch latch = new CountDownLatch( count );
-
-        final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
-
-
-        //connect to our latch, which should run on it's own subscription
-        //start our latch running
-        connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
-
-
-        final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
-
-        //start the sequence
-        connectedObservable.connect();
-
-
-        final boolean completed = latch.await( 5, TimeUnit.SECONDS );
-
-        assertTrue( "publish1 behaves as expected", completed );
-
-        final int returnedCount = countObservable.toBlocking().last();
-
-        assertEquals( "Counts the same", count, returnedCount );
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
new file mode 100644
index 0000000..1d940d0
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Subscription;
+import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test to test some assumptions about RX behaviors
+ */
+public class RxTest {
+
+    @Test
+    public void testPublish() throws InterruptedException {
+
+        final int count = 10;
+
+        final CountDownLatch latch = new CountDownLatch( count );
+
+        final Subscription connectedObservable =
+            Observable.range( 0, count ).doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() )
+                      .subscribe();
+
+
+        final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+        assertTrue( "publish1 behaves as expected", completed );
+
+        final boolean completedSubscription = connectedObservable.isUnsubscribed();
+
+        assertTrue( "Subscription complete", completedSubscription );
+    }
+
+
+    @Test
+    @Ignore("This seems like it should work, yet blocks forever")
+    public void testConnectableObserver() throws InterruptedException {
+
+        final int count = 10;
+
+        final CountDownLatch latch = new CountDownLatch( count );
+
+        final ConnectableObservable<Integer> connectedObservable = Observable.range( 0, count ).publish();
+
+
+        //connect to our latch, which should run on it's own subscription
+        //start our latch running
+        connectedObservable.doOnNext( integer -> latch.countDown() ).subscribeOn( Schedulers.io() ).subscribe();
+
+
+        final Observable<Integer> countObservable = connectedObservable.subscribeOn( Schedulers.io() ).count();
+
+        //start the sequence
+        connectedObservable.connect();
+
+
+        final boolean completed = latch.await( 5, TimeUnit.SECONDS );
+
+        assertTrue( "publish1 behaves as expected", completed );
+
+        final int returnedCount = countObservable.toBlocking().last();
+
+        assertEquals( "Counts the same", count, returnedCount );
+    }
+
+
+    /**
+     * Tests that reduce emits
+     */
+    @Test
+    public void testReduceEmpty(){
+       final int result =  Observable.range( 0, 100 ).filter( value -> value == -1 ).reduce( 0, ( integer, integer2 ) -> integer + 1 ).toBlocking().last();
+
+        assertEquals(0, result);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3ec0f588/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 79c198f..e98e0fd 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -79,15 +79,15 @@ public interface CassandraFig extends GuicyFig {
     String getDiscoveryType();
 
 
-    @Default("CL_LOCAL_ONE")
+    @Default("CL_LOCAL_QUORUM")
     @Key(READ_CL)
     String getReadCL();
 
-    @Default("CL_LOCAL_QUORUM")
+    @Default("CL_QUORUM")
     @Key(READ_CONSISTENT_CL)
     String getConsistentReadCL();
 
-    @Default("CL_QUORUM")
+    @Default("CL_LOCAL_QUORUM")
     @Key(WRITE_CL)
     String getWriteCL();
 


[05/18] usergrid git commit: Makes consistency configurable

Posted by to...@apache.org.
Makes consistency configurable


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/19d30eaf
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/19d30eaf
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/19d30eaf

Branch: refs/heads/USERGRID-1052
Commit: 19d30eafc77095ee74ae126f9d0a849e997b6ad7
Parents: 0326629
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:59:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:59:08 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  4 +---
 .../map/impl/MapSerializationImpl.java          | 21 ++++++++++----------
 2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/19d30eaf/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 6b2eb45..67d0dab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,9 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import com.google.common.base.Optional;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
-import org.apache.usergrid.persistence.queue.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +65,7 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueFig;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/19d30eaf/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 1aa3229..ffe10c9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -33,6 +33,8 @@ import org.apache.cassandra.db.marshal.UTF8Type;
 
 import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKey;
 import org.apache.usergrid.persistence.core.astyanax.BucketScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
 import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
@@ -105,15 +107,9 @@ public class MapSerializationImpl implements MapSerialization {
     /**
      * How to funnel keys for buckets
      */
-    private static final Funnel<String> MAP_KEY_FUNNEL = new Funnel<String>() {
+    private static final Funnel<String> MAP_KEY_FUNNEL = ( key, into ) -> into.putString( key, StringHashUtils.UTF8 );
 
 
-        @Override
-        public void funnel( final String key, final PrimitiveSink into ) {
-            into.putString( key, StringHashUtils.UTF8 );
-        }
-    };
-
     /**
      * Locator to get us all buckets
      */
@@ -121,10 +117,14 @@ public class MapSerializationImpl implements MapSerialization {
         new ExpandingShardLocator<>( MAP_KEY_FUNNEL, NUM_BUCKETS );
 
     private final Keyspace keyspace;
+    private final CassandraConfig cassandraConfig;
 
 
     @Inject
-    public MapSerializationImpl( final Keyspace keyspace ) {this.keyspace = keyspace;}
+    public MapSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig ) {
+        this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
+    }
 
 
     @Override
@@ -387,7 +387,7 @@ public class MapSerializationImpl implements MapSerialization {
         //now get all columns, including the "old row key value"
         try {
             final Column<Boolean> result =
-                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( ConsistencyLevel.CL_QUORUM )
+                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getConsistentReadCL() )
                         .getKey( entryRowKey ).getColumn( true ).execute().getResult();
 
             return result;
@@ -421,7 +421,8 @@ public class MapSerializationImpl implements MapSerialization {
         //now get all columns, including the "old row key value"
         try {
             final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows =
-                keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true ).execute()
+                keyspace.prepareQuery( MAP_ENTRIES ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKeySlice(
+                    rowKeys ).withColumnSlice( true ).execute()
                         .getResult();