You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:39 UTC

[11/50] incubator-usergrid git commit: Refactored to write to cassandra for data, and to SQS for the id of the data.

Refactored to write to cassandra for data, and to SQS for the id of the data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d8eb060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d8eb060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d8eb060

Branch: refs/heads/two-dot-o
Commit: 8d8eb060e20bd9942d591227ae2cdd5337214b81
Parents: 9111d94
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 20:38:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 20:38:52 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/map/MapManager.java    |  35 ++--
 .../persistence/map/impl/MapManagerImpl.java    |   8 +
 .../persistence/map/impl/MapSerialization.java  |   9 +
 .../map/impl/MapSerializationImpl.java          |  93 +++++++++
 .../persistence/map/MapManagerTest.java         |  49 ++++-
 stack/corepersistence/queryindex/pom.xml        |   1 -
 .../index/IndexOperationMessage.java            |  36 +++-
 .../index/impl/BufferQueueSQSImpl.java          | 167 ++++++++++++++--
 .../persistence/index/impl/DeIndexRequest.java  |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 196 +++++++++++--------
 .../index/guice/TestIndexModule.java            |   4 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 145 ++++++++++++++
 .../impl/EntityConnectionIndexImplTest.java     |   4 +-
 .../persistence/index/impl/EntityIndexTest.java |  21 +-
 .../queue/impl/SQSQueueManagerImpl.java         |  32 ++-
 15 files changed, 656 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 62fe57d..69e0874 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -1,23 +1,26 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.usergrid.persistence.map;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 
@@ -33,6 +36,14 @@ public interface MapManager {
      */
     public String getString( final String key );
 
+
+    /**
+     * Get the values for all the keys.  If a value does not exist, it won't be present in the map
+     * @param keys
+     * @return
+     */
+    public Map<String, String> getStrings(final Collection<String> keys);
+
     /**
      * Return the string, null if not found
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index c077c7d..fb2e7ff 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.map.MapManager;
@@ -51,6 +53,12 @@ public class MapManagerImpl implements MapManager {
 
 
     @Override
+    public Map<String, String> getStrings( final Collection<String> keys ) {
+        return mapSerialization.getStrings( scope, keys );
+    }
+
+
+    @Override
     public void putString( final String key, final String value ) {
           mapSerialization.putString( scope, key, value );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 6e7e328..2e958c2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
@@ -33,6 +35,13 @@ public interface MapSerialization extends Migration {
     public String getString( final MapScope scope, final String key );
 
     /**
+     * Get strings from the map
+     * @param keys
+     * @return
+     */
+    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+
+    /**
      * Return the string, null if not found
      */
     public void putString( final MapScope scope, final String key, final String value );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 715c202..825d636 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,9 +18,12 @@
  */
 
 package org.apache.usergrid.persistence.map.impl;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.base.Preconditions;
@@ -50,6 +53,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.ColumnFamilyQuery;
 import com.netflix.astyanax.serializers.BooleanSerializer;
 import com.netflix.astyanax.serializers.StringSerializer;
 
@@ -73,6 +79,9 @@ public class MapSerializationImpl implements MapSerialization {
         private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
 
 
+    private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
+
+
         /**
          * CFs where the row key contains the source node id
          */
@@ -126,6 +135,12 @@ public class MapSerializationImpl implements MapSerialization {
 
 
     @Override
+    public Map<String, String> getStrings(final MapScope scope,  final Collection<String> keys ) {
+        return getValues( scope, keys, STRING_RESULTS_BUILDER );
+    }
+
+
+    @Override
     public void putString( final MapScope scope, final String key, final String value ) {
         final RowOp op = new RowOp() {
             @Override
@@ -371,6 +386,49 @@ public class MapSerializationImpl implements MapSerialization {
     }
 
 
+    /**
+     * Get multiple values, using the string builder
+     * @param scope
+     * @param keys
+     * @param builder
+     * @param <T>
+     * @return
+     */
+    private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+
+
+        final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
+
+        for(final String key: keys){
+             //add it to the entry
+            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+
+            rowKeys.add( entryRowKey );
+
+        }
+
+
+
+          //now get all columns, including the "old row key value"
+          try {
+              final Rows<ScopedRowKey<MapEntryKey>, Boolean>
+                  rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
+                                                     .execute().getResult();
+
+
+             return builder.buildResults( rows );
+          }
+          catch ( NotFoundException nfe ) {
+              //nothing to return
+              return null;
+          }
+          catch ( ConnectionException e ) {
+              throw new RuntimeException( "Unable to connect to cassandra", e );
+          }
+      }
+
+
+
     private void executeBatch(MutationBatch batch) {
         try {
             batch.execute();
@@ -449,4 +507,39 @@ public class MapSerializationImpl implements MapSerialization {
             return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
         }
     }
+
+
+    /**
+     * Build the results from the row keys
+     * @param <T>
+     */
+    private static interface ResultsBuilder<T> {
+
+        public T buildResults(final  Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+    }
+
+    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+        @Override
+        public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
+            final int size = rows.size();
+
+            final Map<String, String> results = new HashMap<>(size);
+
+            for(int i = 0; i < size; i ++){
+
+                final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
+
+                final String value = row.getColumns().getStringValue( true, null );
+
+                if(value == null){
+                    continue;
+                }
+
+               results.put( row.getKey().getKey().key,  value );
+            }
+
+            return results;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index df4394e..41286ab 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -20,6 +20,9 @@
 package org.apache.usergrid.persistence.map;
 
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -34,9 +37,11 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.map.guice.TestMapModule;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 
+import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -79,6 +84,47 @@ public class MapManagerTest {
 
 
     @Test
+    public void multiReadNoKey() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key = UUIDGenerator.newTimeUUID().toString();
+
+        final Map<String, String> results = mm.getStrings( Collections.singleton( key ) );
+
+        assertNotNull( results );
+
+        final String shouldBeMissing = results.get( key );
+
+        assertNull( shouldBeMissing );
+    }
+
+
+    @Test
+    public void writeReadStringBatch() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key1 = "key1";
+        final String value1 = "value1";
+
+        mm.putString( key1, value1 );
+
+
+        final String key2 = "key2";
+        final String value2 = "value2";
+
+        mm.putString( key2, value2 );
+
+
+        final Map<String, String> returned = mm.getStrings( Arrays.asList( key1, key2 ) );
+
+        assertNotNull( returned );
+
+        assertEquals( value1, returned.get( key1 ) );
+        assertEquals( value2, returned.get( key2 ) );
+    }
+
+
+    @Test
     public void writeReadStringTTL() throws InterruptedException {
 
         MapManager mm = mmf.createMapManager( this.scope );
@@ -106,8 +152,7 @@ public class MapManagerTest {
         //now read it should be gone
         final String timedOut = mm.getString( key );
 
-        assertNull("Value was not returned", timedOut);
-
+        assertNull( "Value was not returned", timedOut );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index a5fbf6a..af843ad 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -145,7 +145,6 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>${slf4j.version}</version>
-            <scope>test</scope>
         </dependency>
 
         <!-- common stuff, logging, etc.-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 7d8a859..a7388d6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -60,17 +60,17 @@ public class IndexOperationMessage implements Serializable {
 
 
     public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
-        indexRequests.addAll( indexRequests );
+        this.indexRequests.addAll( indexRequests );
     }
 
 
     public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
-        deIndexRequests.add( deIndexRequest );
+        this.deIndexRequests.add( deIndexRequest );
     }
 
 
     public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
-        deIndexRequests.addAll( deIndexRequests );
+        this.deIndexRequests.addAll( deIndexRequests );
     }
 
 
@@ -96,4 +96,34 @@ public class IndexOperationMessage implements Serializable {
     public BetterFuture<IndexOperationMessage> getFuture() {
         return containerFuture;
     }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final IndexOperationMessage that = ( IndexOperationMessage ) o;
+
+        if ( !deIndexRequests.equals( that.deIndexRequests ) ) {
+            return false;
+        }
+        if ( !indexRequests.equals( that.indexRequests ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = indexRequests.hashCode();
+        result = 31 * result + deIndexRequests.hashCode();
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 833e045..c6acb36 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -23,56 +23,132 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 
+/**
+ * This is experimental at best.  Our SQS size limit is a problem.  We shouldn't use this for index operation. Only for
+ * performing
+ */
 @Singleton
 public class BufferQueueSQSImpl implements BufferQueue {
 
+    private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
+
     /** Hacky, copied from CPEntityManager b/c we can't access it here */
     public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
 
 
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+    /**
+     * The name to put in the map
+     */
+    public static final String MAP_NAME = "esqueuedata";
+
+
     private static final String QUEUE_NAME = "es_queue";
 
+    private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+
+    static {
+        SMILE_FACTORY.delegateToTextual( true );
+    }
+
+
     private final QueueManager queue;
+    private final MapManager mapManager;
     private final IndexFig indexFig;
+    private final ObjectMapper mapper;
+    private final Meter readMeter;
+    private final Timer readTimer;
+    private final Meter writeMeter;
+    private final Timer writeTimer;
 
 
     @Inject
-    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
-        final QueueScope scope =
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
+                               final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        final QueueScope queueScope =
             new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
 
-        this.queue = queueManagerFactory.getQueueManager( scope );
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
         this.indexFig = indexFig;
+
+        final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
+
+        this.mapManager = mapManagerFactory.createMapManager( scope );
+
+
+        this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
+        this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
+
+        this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
+        this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
+
+        this.mapper = new ObjectMapper( SMILE_FACTORY );
+        //pretty print, disabling for speed
+        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
     }
 
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
 
+        final Timer.Context timer = this.writeTimer.time();
+        this.writeMeter.mark();
+
+        final UUID identifier = UUIDGenerator.newTimeUUID();
 
         try {
-            this.queue.sendMessage( operation );
+
+            final String payLoad = toString( operation );
+
+            //write to cassandra
+            this.mapManager.putString( identifier.toString(), payLoad, TTL );
+
+            //signal to SQS
+            this.queue.sendMessage( identifier );
             operation.getFuture().run();
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
         }
+        finally {
+            timer.stop();
+        }
     }
 
 
@@ -83,23 +159,71 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
         final int actualTake = Math.min( 10, takeSize );
 
-        List<QueueMessage> messages = queue
-            .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
-                IndexOperationMessage.class );
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
 
+            List<QueueMessage> messages = queue
+                .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                    String.class );
 
-        final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
-        for ( final QueueMessage message : messages ) {
 
-            final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
+            final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
-            SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message,  messageBody );
+            final List<String> mapEntries = new ArrayList<>( messages.size() );
 
-            response.add( operation );
-        }
 
-        return response;
+            if(messages.size() == 0){
+                return response;
+            }
+
+            //add all our keys  for a single round trip
+            for ( final QueueMessage message : messages ) {
+                mapEntries.add( message.getBody().toString() );
+            }
+
+            //look up the values
+            final Map<String, String> values = mapManager.getStrings( mapEntries );
+
+
+            //load them into our response
+            for ( final QueueMessage message : messages ) {
+
+                final String key = message.getBody().toString();
+
+                //now see if the key was there
+                final String payload = values.get( key );
+
+                //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
+                // a DLQ
+
+                if ( payload == null ) {
+                    continue;
+                }
+
+                final IndexOperationMessage messageBody;
+
+                try {
+                    messageBody = fromString( payload );
+                }
+                catch ( IOException e ) {
+                    logger.error( "Unable to deserialize message from string.  This is a bug", e );
+                    throw new RuntimeException( "Unable to deserialize message from string.  This is a bug", e );
+                }
+
+                SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
+
+                response.add( operation );
+            }
+
+            readMeter.mark( response.size() );
+            return response;
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
     }
 
 
@@ -107,7 +231,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
     public void ack( final List<IndexOperationMessage> messages ) {
 
         //nothing to do
-        if(messages.size() == 0){
+        if ( messages.size() == 0 ) {
             return;
         }
 
@@ -121,6 +245,19 @@ public class BufferQueueSQSImpl implements BufferQueue {
     }
 
 
+    /** Read the object from Base64 string. */
+    private IndexOperationMessage fromString( String s ) throws IOException {
+        IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
+        return o;
+    }
+
+
+    /** Write the object to a Base64 string. */
+    private String toString( IndexOperationMessage o ) throws IOException {
+        return mapper.writeValueAsString( o );
+    }
+
+
     /**
      * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index c63c4df..9f3ce66 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -37,9 +37,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public class DeIndexRequest implements BatchRequest {
 
-    public final String[] indexes;
-    public final String entityType;
-    public final String documentId;
+    public String[] indexes;
+    public String entityType;
+    public String documentId;
 
 
     public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
@@ -49,6 +49,10 @@ public class DeIndexRequest implements BatchRequest {
     }
 
 
+    public DeIndexRequest() {
+    }
+
+
     @Override
     public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 8547889..3fc3e77 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -42,7 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
+import rx.Subscription;
 import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -60,17 +62,23 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final IndexFig config;
     private final FailureMonitorImpl failureMonitor;
     private final Client client;
-    private final Observable<List<IndexOperationMessage>> consumer;
+
     private final Timer flushTimer;
     private final Counter indexSizeCounter;
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final BufferQueue bufferQueue;
 
+    //the actively running subscription
+    private Subscription subscription;
+
+    private  Observable<List<IndexOperationMessage>> consumer;
+
+    private Object mutex = new Object();
+
     @Inject
     public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
         provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
-        this.bufferQueue = bufferQueue;
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
         this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -78,81 +86,101 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
+        this.bufferQueue = bufferQueue;
+
 
 
-        final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
-            @Override
-            public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+          start();
+    }
+
+
+    public void start() {
+        synchronized ( mutex) {
 
-                //name our thread so it's easy to see
-                Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+            final AtomicInteger countFail = new AtomicInteger();
 
-                List<IndexOperationMessage> drainList;
-                do {
-                    try {
+            Observable<List<IndexOperationMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
+                @Override
+                public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
+                    //name our thread so it's easy to see
+                    Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
 
-                        Timer.Context timer = produceTimer.time();
+                    List<IndexOperationMessage> drainList;
+                    do {
+                        try {
 
-                        drainList = bufferQueue
-                            .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
 
+                            Timer.Context timer = produceTimer.time();
 
-                        subscriber.onNext( drainList );
+                            drainList = bufferQueue.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
+                                TimeUnit.MILLISECONDS );
 
 
-                        timer.stop();
+                            subscriber.onNext( drainList );
 
-                        countFail.set( 0 );
-                    }
-                    catch ( EsRejectedExecutionException err ) {
-                        countFail.incrementAndGet();
-                        log.error(
-                            "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  "
-                                + "Failed {} consecutive times",
-                            config.getFailRefreshCount(), countFail.get() );
-
-                        //es  rejected the exception, sleep and retry in the queue
-                        try {
-                            Thread.sleep( config.getFailureRetryTime() );
+
+                            timer.stop();
+
+                            countFail.set( 0 );
                         }
-                        catch ( InterruptedException e ) {
-                            //swallow
+                        catch ( EsRejectedExecutionException err ) {
+                            countFail.incrementAndGet();
+                            log.error(
+                                "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  " + "Failed {} consecutive times", config.getFailRefreshCount(),
+                                countFail.get() );
+
+                            //es  rejected the exception, sleep and retry in the queue
+                            try {
+                                Thread.sleep( config.getFailureRetryTime() );
+                            }
+                            catch ( InterruptedException e ) {
+                                //swallow
+                            }
                         }
-                    }
-                    catch ( Exception e ) {
-                        int count = countFail.incrementAndGet();
-                        log.error( "failed to dequeue", e );
-                        if ( count > 200 ) {
-                            log.error( "Shutting down index drain due to repetitive failures" );
+                        catch ( Exception e ) {
+                            int count = countFail.incrementAndGet();
+                            log.error( "failed to dequeue", e );
+                            if ( count > 200 ) {
+                                log.error( "Shutting down index drain due to repetitive failures" );
+                            }
                         }
                     }
+                    while ( true );
                 }
-                while ( true );
-            }
-        } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
-            @Override
-            public void call( List<IndexOperationMessage> containerList ) {
-                if ( containerList.size() > 0 ) {
-                    flushMeter.mark( containerList.size() );
-                    Timer.Context time = flushTimer.time();
-                    execute( containerList );
-                    time.stop();
+            } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+                @Override
+                public void call( List<IndexOperationMessage> containerList ) {
+                    if ( containerList.size() > 0 ) {
+                        flushMeter.mark( containerList.size() );
+                        Timer.Context time = flushTimer.time();
+                        execute( containerList );
+                        time.stop();
+                    }
                 }
+            } )
+                //ack after we process
+                .doOnNext( new Action1<List<IndexOperationMessage>>() {
+                    @Override
+                    public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+                        bufferQueue.ack( indexOperationMessages );
+                    }
+                } );
+
+            //start in the background
+            subscription = consumer.subscribe();
+        }
+    }
+
+
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+            if(subscription != null) {
+                subscription.unsubscribe();
             }
-} )
-            //ack after we process
-          .doOnNext( new Action1<List<IndexOperationMessage>>() {
-              @Override
-              public void call( final List<IndexOperationMessage> indexOperationMessages ) {
-                  bufferQueue.ack( indexOperationMessages );
-              }
-          } );
-
-        //start in the background
-        consumer.subscribe();
+        }
     }
 
     /**
@@ -165,43 +193,36 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
-            .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
-                @Override
-                public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
-                    final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
-                    final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
-
-                    return Observable.merge( deIndex, index );
-                }
-            } );
-
+        Observable<IndexOperationMessage> flattenMessages = Observable.from( operationMessages );
 
 
         //batch shard operations into a bulk request
-        flattenMessages.toList()
-            .doOnNext(new Action1<List<BatchRequest>>() {
-                @Override
-                public void call(List<BatchRequest> builders) {
-                    try {
-                        final BulkRequestBuilder bulkRequest = initRequest();
-                        for (BatchRequest builder : builders) {
-                            indexSizeCounter.dec();
+        flattenMessages.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
+            @Override
+            public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
+                final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
+                final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() );
 
-                            builder.doOperation( client, bulkRequest );
-                        }
-                        sendRequest(bulkRequest);
-                    }catch (Exception e){
-                        log.error("Failed while sending bulk",e);
-                    }
-                }
-            })
-            .toBlocking().lastOrDefault(null);
+                return Observable.merge( index, deIndex );
+            }
+        } )
+      //collection all the operations into a single stream
+       .collect( initRequest(), new Action2<BulkRequestBuilder, BatchRequest>() {
+           @Override
+           public void call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+               batchRequest.doOperation( client, bulkRequestBuilder );
+           }
+       } )
+        //send the request off to ES
+        .doOnNext( new Action1<BulkRequestBuilder>() {
+            @Override
+            public void call( final BulkRequestBuilder bulkRequestBuilder ) {
+                sendRequest( bulkRequestBuilder );
+            }
+        } ).toBlocking().last();
 
         //call back all futures
         Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
             .doOnNext(new Action1<IndexOperationMessage>() {
                 @Override
                 public void call(IndexOperationMessage operationMessage) {
@@ -211,6 +232,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .toBlocking().lastOrDefault(null);
     }
 
+
     /**
      * initialize request
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 57c2fab..7d7a18d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
new file mode 100644
index 0000000..4a4672e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.junit.Assert.*;
+
+
+
+
+@RunWith(EsRunner.class)
+@UseModules({ TestIndexModule.class })
+@NotThreadSafe
+public class BufferQueueSQSImplTest {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+    @Inject
+    private BufferQueueSQSImpl bufferQueueSQS;
+
+    @Inject
+    private EsIndexBufferConsumerImpl esIndexBufferConsumer;
+
+
+    @Before
+    public void stop() {
+        esIndexBufferConsumer.stop();
+    }
+
+
+    @After
+    public void after() {
+        esIndexBufferConsumer.start();
+    }
+
+
+
+
+    @Test
+    public void testMessageIndexing(){
+
+        final Map<String, Object> request1Data  = new HashMap<String, Object>() {{put("test", "testval1");}};
+        final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", "testType1", "testDoc1",request1Data );
+
+
+        final Map<String, Object> request2Data  = new HashMap<String, Object>() {{put("test", "testval2");}};
+        final IndexRequest indexRequest2 =  new IndexRequest( "testAlias2", "testType2", "testDoc2",request2Data );
+
+
+        //de-index request
+        final DeIndexRequest deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, "testType3", "testId3" );
+
+        final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, "testType4", "testId4" );
+
+
+
+
+        IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+        indexOperationMessage.addIndexRequest( indexRequest1);
+        indexOperationMessage.addIndexRequest( indexRequest2);
+
+        indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
+        indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
+
+        bufferQueueSQS.offer( indexOperationMessage );
+
+        //wait for it to send to SQS
+        indexOperationMessage.getFuture().get();
+
+        //now get it back
+
+        final List<IndexOperationMessage> ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
+
+        assertTrue(ops.size() > 1);
+
+        final IndexOperationMessage returnedOperation = ops.get( 0 );
+
+         //get the operations out
+
+        final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests();
+
+        assertTrue(indexRequestSet.contains(indexRequest1));
+        assertTrue(indexRequestSet.contains(indexRequest2));
+
+
+        final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests();
+
+        assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
+        assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
+
+
+
+        //now ack the message
+
+        bufferQueueSQS.ack( ops );
+
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 215ff57..c5f3488 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
         EsTestUtils.waitForTasks(personLikesIndex);
-        Thread.sleep( 30000 );
+        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
-        Thread.sleep( 30000 );
+        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a15053c..a2135a3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,12 +24,14 @@ import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.model.field.ArrayField;
 import org.apache.usergrid.persistence.model.field.EntityObjectField;
 import org.apache.usergrid.persistence.model.field.UUIDField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -69,8 +71,14 @@ public class EntityIndexTest extends BaseIT {
     @Inject
     public EntityIndexFactory eif;
 
+    //TODO T.N. Remove this when we move the cursor mapping back to core
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
     @Test
-    public void testIndex() throws IOException {
+    public void testIndex() throws IOException, InterruptedException {
         Id appId = new SimpleId( "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
@@ -86,7 +94,9 @@ public class EntityIndexTest extends BaseIT {
 
         entityIndex.refresh();
 
-        testQueries( indexScope, searchTypes,  entityIndex );
+        Thread.sleep(100000000);
+
+        testQueries( indexScope, searchTypes, entityIndex );
     }
 
     @Test
@@ -660,15 +670,12 @@ public class EntityIndexTest extends BaseIT {
 
         for ( int i = 0; i < size; i++ ) {
             final String middleName = "middleName" + UUIDUtils.newTimeUUID();
-            Map<String, Object> properties = new LinkedHashMap<String, Object>();
-            properties.put( "username", "edanuff" );
-            properties.put( "email", "ed@anuff.com" );
-            properties.put( "middlename", middleName );
 
             Map entityMap = new HashMap() {{
                 put( "username", "edanuff" );
                 put( "email", "ed@anuff.com" );
                 put( "middlename", middleName );
+                put( "created", System.nanoTime() );
             }};
 
             final Id userId = new SimpleId( "user" );
@@ -700,7 +707,7 @@ public class EntityIndexTest extends BaseIT {
 
         for ( int i = 0; i < expectedPages; i++ ) {
             //**
-            final Query query = Query.fromQL( "select *" );
+            final Query query = Query.fromQL( "select * order by created" );
             query.setLimit( limit );
 
             if ( cursor != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 10aa621..e2c5c1e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -48,20 +48,19 @@ import java.util.concurrent.TimeUnit;
 public class SQSQueueManagerImpl implements QueueManager {
     private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
-    private  AmazonSQSClient sqs;
-    private  QueueScope scope;
-    private  QueueFig fig;
+    private  final AmazonSQSClient sqs;
+    private  final QueueScope scope;
     private  ObjectMapper mapper;
     private static SmileFactory smileFactory = new SmileFactory();
 
-    private static LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
+    private LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
             .maximumSize(1000)
             .build(new CacheLoader<SqsLoader, Queue>() {
                        @Override
                        public Queue load(SqsLoader queueLoader) throws Exception {
                            Queue queue = null;
                            try {
-                               GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
+                               GetQueueUrlResult result = sqs.getQueueUrl(queueLoader.getKey());
                                queue = new Queue(result.getQueueUrl());
                            } catch (QueueDoesNotExistException queueDoesNotExistException) {
                                queue = null;
@@ -73,7 +72,7 @@ public class SQSQueueManagerImpl implements QueueManager {
                                String name = queueLoader.getKey();
                                CreateQueueRequest createQueueRequest = new CreateQueueRequest()
                                        .withQueueName(name);
-                               CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
+                               CreateQueueResult result = sqs.createQueue(createQueueRequest);
                                String url = result.getQueueUrl();
                                queue = new Queue(url);
                                LOG.info("Created queue with url {}", url);
@@ -85,7 +84,6 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
-        this.fig = fig;
         this.scope = scope;
         try {
             UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
@@ -99,8 +97,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
         } catch ( Exception e ) {
-            LOG.warn("failed to setup SQS",e);
-//            throw new RuntimeException("Error setting up mapper", e);
+            throw new RuntimeException("Error setting up mapper", e);
         }
     }
 
@@ -127,14 +124,14 @@ public class SQSQueueManagerImpl implements QueueManager {
         }
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
-        LOG.info("Getting {} messages from {}", limit, url);
+        LOG.debug( "Getting {} messages from {}", limit, url);
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
         receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
         receiveMessageRequest.setWaitTimeSeconds(waitTime);
         ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
         List<Message> messages = result.getMessages();
-        LOG.info("Received {} messages from {}",messages.size(),url);
+        LOG.debug( "Received {} messages from {}", messages.size(), url);
         List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
         for (Message message : messages) {
             Object body ;
@@ -157,7 +154,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return;
         }
         String url = getQueue().getUrl();
-        LOG.info("Sending Messages...{} to {}", bodies.size(), url);
+        LOG.debug( "Sending Messages...{} to {}", bodies.size(), url);
 
         SendMessageBatchRequest request = new SendMessageBatchRequest(url);
         List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
@@ -180,7 +177,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return;
         }
         String url = getQueue().getUrl();
-        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        LOG.debug( "Sending Message...{} to {}", body.toString(), url);
 
         final String stringBody = toString(body);
 
@@ -192,7 +189,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessage(QueueMessage queueMessage) {
         String url = getQueue().getUrl();
-        LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
+        LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
                 .withQueueUrl(url)
@@ -203,7 +200,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessages(List<QueueMessage> queueMessages) {
         String url = getQueue().getUrl();
-        LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
+        LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url);
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
         for(QueueMessage message : queueMessages){
             entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
@@ -233,16 +230,11 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     public class SqsLoader {
         private final String key;
-        private final AmazonSQSClient client;
 
         public SqsLoader(String key, AmazonSQSClient client) {
             this.key = key;
-            this.client = client;
         }
 
-        public AmazonSQSClient getClient() {
-            return client;
-        }
 
         public String getKey() {
             return key;