You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/11 03:38:56 UTC

[1/5] incubator-usergrid git commit: Added buffer wiring to guice and updated the tests.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-466 31bee37a4 -> 8d8eb060e


Added buffer wiring to guice and updated the tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cd0015d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cd0015d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cd0015d5

Branch: refs/heads/USERGRID-466
Commit: cd0015d5af1371528943e015022be21b2d8099f9
Parents: c5a4767
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 14:56:47 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 14:56:47 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexFig.java    |  9 ++++++++
 .../persistence/index/guice/IndexModule.java    | 10 ++++++++-
 .../persistence/index/impl/BufferQueue.java     |  7 ++++++
 .../index/impl/BufferQueueInMemory.java         | 18 ++++++++-------
 .../index/impl/EsIndexBufferConsumerImpl.java   | 23 +++++++++++++++-----
 .../index/impl/EsIndexBufferProducerImpl.java   |  2 +-
 .../index/guice/TestIndexModule.java            |  9 +++++++-
 7 files changed, 62 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index c6f08f6..befbaa9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -64,6 +64,11 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
 
+    /**
+     *  Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple backpressure
+     */
+    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME =  "elasticsearch.rejected_retry_wait";
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -158,4 +163,8 @@ public interface IndexFig extends GuicyFig {
     @Default("one")
     @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
     String getWriteConsistencyLevel();
+
+    @Default("1000")
+    @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
+    long getFailureRetryTime();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index ebd9098..d911dab 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
-public class IndexModule extends AbstractModule {
+public abstract class IndexModule extends AbstractModule {
 
     @Override
     protected void configure() {
@@ -48,6 +48,14 @@ public class IndexModule extends AbstractModule {
         bind(IndexBufferProducer.class).to(EsIndexBufferProducerImpl.class);
         bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
 
+        wireBufferQueue();
     }
 
+
+    /**
+     * Write the <class>BufferQueue</class> for this implementation
+     */
+    public abstract void wireBufferQueue();
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
index dec6ac3..ffc3b90 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueue.java
@@ -46,4 +46,11 @@ public interface BufferQueue {
      * @return
      */
     public List<IndexOperationMessage> take(final int takeSize, final long timeout, final TimeUnit timeUnit );
+
+
+    /**
+     * Ack all messages so they do not appear again.  Meant for transactional queues, and may or may not be implemented
+     * @param messages
+     */
+    public void ack(List<IndexOperationMessage> messages);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
index 502f45d..403762f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
@@ -36,15 +36,11 @@ import com.google.inject.Singleton;
 public class BufferQueueInMemory implements BufferQueue {
 
     private final ArrayBlockingQueue<IndexOperationMessage> messages;
-    private final IndexFig fig;
 
 
     @Inject
-    public BufferQueueInMemory( final ArrayBlockingQueue<IndexOperationMessage> messages, final IndexFig fig ) {
-        this.messages = messages;
-
-
-        this.fig = fig;
+    public BufferQueueInMemory(final IndexFig fig ) {
+        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
     }
 
 
@@ -78,8 +74,14 @@ public class BufferQueueInMemory implements BufferQueue {
 
             }
         }
-        while ( response.size() < takeSize &&  System.currentTimeMillis() < endTime );
+        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+        return response;
+    }
+
 
-        return null;
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+         //no op for this
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 19b8438..45c12a1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -39,6 +39,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -70,9 +71,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final BufferQueue bufferQueue;
 
     @Inject
-    public EsIndexBufferConsumerImpl( final IndexFig config, final IndexBufferProducer producer, final EsProvider
-        provider, final MetricsFactory metricsFactory,
-                                      final BufferQueue bufferQueue ){
+    public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
+        provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
         this.bufferQueue = bufferQueue;
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
@@ -101,17 +101,30 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         for ( IndexOperationMessage drained : drainList ) {
                             subscriber.onNext( drained );
                         }
-                        drainList.clear();
+
+                        bufferQueue.ack( drainList );
+
                         timer.stop();
 
                         countFail.set( 0 );
                     }
+                    catch( EsRejectedExecutionException err)  {
+                        countFail.incrementAndGet();
+                        log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+
+                       //es  rejected the exception, sleep and retry in the queue
+                        try {
+                            Thread.sleep( config.getFailureRetryTime() );
+                        }
+                        catch ( InterruptedException e ) {
+                            //swallow
+                        }
+                    }
                     catch ( Exception e ) {
                         int count = countFail.incrementAndGet();
                         log.error( "failed to dequeue", e );
                         if ( count > 200 ) {
                             log.error( "Shutting down index drain due to repetitive failures" );
-                            //break;
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index f9999b2..db1f50e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -46,7 +46,7 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
     private final BufferQueue bufferQueue;
 
     @Inject
-    public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, IndexFig fig, final BufferQueue bufferQueue ){
+    public EsIndexBufferProducerImpl( MetricsFactory metricsFactory, final BufferQueue bufferQueue ){
         this.bufferQueue = bufferQueue;
         this.indexSizeCounter = metricsFactory.getCounter(EsIndexBufferProducerImpl.class, "index.buffer.size");
         this.timer =  metricsFactory.getTimer(EsIndexBufferProducerImpl.class,"index.buffer.producer.timer");

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd0015d5/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 4d68dda..7e2312d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.guice;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
 
 
 public class TestIndexModule extends TestModule {
@@ -32,6 +34,11 @@ public class TestIndexModule extends TestModule {
 
         // configure collections and our core astyanax framework
         install( new CollectionModule() );
-        install( new IndexModule() );
+        install( new IndexModule() {
+            @Override
+            public void wireBufferQueue() {
+                bind( BufferQueue.class).to( BufferQueueInMemory.class );
+            }
+        } );
     }
 }


[5/5] incubator-usergrid git commit: Refactored to write to cassandra for data, and to SQS for the id of the data.

Posted by to...@apache.org.
Refactored to write to cassandra for data, and to SQS for the id of the data.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8d8eb060
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8d8eb060
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8d8eb060

Branch: refs/heads/USERGRID-466
Commit: 8d8eb060e20bd9942d591227ae2cdd5337214b81
Parents: 9111d94
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 20:38:52 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 20:38:52 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/map/MapManager.java    |  35 ++--
 .../persistence/map/impl/MapManagerImpl.java    |   8 +
 .../persistence/map/impl/MapSerialization.java  |   9 +
 .../map/impl/MapSerializationImpl.java          |  93 +++++++++
 .../persistence/map/MapManagerTest.java         |  49 ++++-
 stack/corepersistence/queryindex/pom.xml        |   1 -
 .../index/IndexOperationMessage.java            |  36 +++-
 .../index/impl/BufferQueueSQSImpl.java          | 167 ++++++++++++++--
 .../persistence/index/impl/DeIndexRequest.java  |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 196 +++++++++++--------
 .../index/guice/TestIndexModule.java            |   4 +-
 .../index/impl/BufferQueueSQSImplTest.java      | 145 ++++++++++++++
 .../impl/EntityConnectionIndexImplTest.java     |   4 +-
 .../persistence/index/impl/EntityIndexTest.java |  21 +-
 .../queue/impl/SQSQueueManagerImpl.java         |  32 ++-
 15 files changed, 656 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index 62fe57d..69e0874 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -1,23 +1,26 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.usergrid.persistence.map;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 
@@ -33,6 +36,14 @@ public interface MapManager {
      */
     public String getString( final String key );
 
+
+    /**
+     * Get the values for all the keys.  If a value does not exist, it won't be present in the map
+     * @param keys
+     * @return
+     */
+    public Map<String, String> getStrings(final Collection<String> keys);
+
     /**
      * Return the string, null if not found
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
index c077c7d..fb2e7ff 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerImpl.java
@@ -18,6 +18,8 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.map.MapManager;
@@ -51,6 +53,12 @@ public class MapManagerImpl implements MapManager {
 
 
     @Override
+    public Map<String, String> getStrings( final Collection<String> keys ) {
+        return mapSerialization.getStrings( scope, keys );
+    }
+
+
+    @Override
     public void putString( final String key, final String value ) {
           mapSerialization.putString( scope, key, value );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
index 6e7e328..2e958c2 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerialization.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.persistence.map.impl;
 
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
@@ -33,6 +35,13 @@ public interface MapSerialization extends Migration {
     public String getString( final MapScope scope, final String key );
 
     /**
+     * Get strings from the map
+     * @param keys
+     * @return
+     */
+    public Map<String, String> getStrings( final MapScope scope, final Collection<String> keys );
+
+    /**
      * Return the string, null if not found
      */
     public void putString( final MapScope scope, final String key, final String value );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 715c202..825d636 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -18,9 +18,12 @@
  */
 
 package org.apache.usergrid.persistence.map.impl;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.base.Preconditions;
@@ -50,6 +53,9 @@ import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
 import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.model.Row;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.ColumnFamilyQuery;
 import com.netflix.astyanax.serializers.BooleanSerializer;
 import com.netflix.astyanax.serializers.StringSerializer;
 
@@ -73,6 +79,9 @@ public class MapSerializationImpl implements MapSerialization {
         private static final StringSerializer STRING_SERIALIZER = StringSerializer.get();
 
 
+    private static final StringResultsBuilder STRING_RESULTS_BUILDER = new StringResultsBuilder();
+
+
         /**
          * CFs where the row key contains the source node id
          */
@@ -126,6 +135,12 @@ public class MapSerializationImpl implements MapSerialization {
 
 
     @Override
+    public Map<String, String> getStrings(final MapScope scope,  final Collection<String> keys ) {
+        return getValues( scope, keys, STRING_RESULTS_BUILDER );
+    }
+
+
+    @Override
     public void putString( final MapScope scope, final String key, final String value ) {
         final RowOp op = new RowOp() {
             @Override
@@ -371,6 +386,49 @@ public class MapSerializationImpl implements MapSerialization {
     }
 
 
+    /**
+     * Get multiple values, using the string builder
+     * @param scope
+     * @param keys
+     * @param builder
+     * @param <T>
+     * @return
+     */
+    private <T> T getValues(final MapScope scope, final Collection<String> keys, final ResultsBuilder<T> builder) {
+
+
+        final List<ScopedRowKey<MapEntryKey>> rowKeys = new ArrayList<>( keys.size() );
+
+        for(final String key: keys){
+             //add it to the entry
+            final ScopedRowKey<MapEntryKey> entryRowKey = MapEntryKey.fromKey(scope, key);
+
+            rowKeys.add( entryRowKey );
+
+        }
+
+
+
+          //now get all columns, including the "old row key value"
+          try {
+              final Rows<ScopedRowKey<MapEntryKey>, Boolean>
+                  rows = keyspace.prepareQuery( MAP_ENTRIES ).getKeySlice( rowKeys ).withColumnSlice( true )
+                                                     .execute().getResult();
+
+
+             return builder.buildResults( rows );
+          }
+          catch ( NotFoundException nfe ) {
+              //nothing to return
+              return null;
+          }
+          catch ( ConnectionException e ) {
+              throw new RuntimeException( "Unable to connect to cassandra", e );
+          }
+      }
+
+
+
     private void executeBatch(MutationBatch batch) {
         try {
             batch.execute();
@@ -449,4 +507,39 @@ public class MapSerializationImpl implements MapSerialization {
             return ScopedRowKey.fromKey( mapScope.getApplication(), new MapEntryKey( mapScope.getName(), key ) );
         }
     }
+
+
+    /**
+     * Build the results from the row keys
+     * @param <T>
+     */
+    private static interface ResultsBuilder<T> {
+
+        public T buildResults(final  Rows<ScopedRowKey<MapEntryKey>, Boolean> rows);
+    }
+
+    public static class StringResultsBuilder implements ResultsBuilder<Map<String, String>>{
+
+        @Override
+        public Map<String, String> buildResults( final Rows<ScopedRowKey<MapEntryKey>, Boolean> rows ) {
+            final int size = rows.size();
+
+            final Map<String, String> results = new HashMap<>(size);
+
+            for(int i = 0; i < size; i ++){
+
+                final Row<ScopedRowKey<MapEntryKey>, Boolean> row = rows.getRowByIndex( i );
+
+                final String value = row.getColumns().getStringValue( true, null );
+
+                if(value == null){
+                    continue;
+                }
+
+               results.put( row.getKey().getKey().key,  value );
+            }
+
+            return results;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
index df4394e..41286ab 100644
--- a/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
+++ b/stack/corepersistence/map/src/test/java/org/apache/usergrid/persistence/map/MapManagerTest.java
@@ -20,6 +20,9 @@
 package org.apache.usergrid.persistence.map;
 
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -34,9 +37,11 @@ import org.apache.usergrid.persistence.core.test.UseModules;
 import org.apache.usergrid.persistence.map.guice.TestMapModule;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import com.google.inject.Inject;
 
+import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
@@ -79,6 +84,47 @@ public class MapManagerTest {
 
 
     @Test
+    public void multiReadNoKey() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key = UUIDGenerator.newTimeUUID().toString();
+
+        final Map<String, String> results = mm.getStrings( Collections.singleton( key ) );
+
+        assertNotNull( results );
+
+        final String shouldBeMissing = results.get( key );
+
+        assertNull( shouldBeMissing );
+    }
+
+
+    @Test
+    public void writeReadStringBatch() {
+        MapManager mm = mmf.createMapManager( this.scope );
+
+        final String key1 = "key1";
+        final String value1 = "value1";
+
+        mm.putString( key1, value1 );
+
+
+        final String key2 = "key2";
+        final String value2 = "value2";
+
+        mm.putString( key2, value2 );
+
+
+        final Map<String, String> returned = mm.getStrings( Arrays.asList( key1, key2 ) );
+
+        assertNotNull( returned );
+
+        assertEquals( value1, returned.get( key1 ) );
+        assertEquals( value2, returned.get( key2 ) );
+    }
+
+
+    @Test
     public void writeReadStringTTL() throws InterruptedException {
 
         MapManager mm = mmf.createMapManager( this.scope );
@@ -106,8 +152,7 @@ public class MapManagerTest {
         //now read it should be gone
         final String timedOut = mm.getString( key );
 
-        assertNull("Value was not returned", timedOut);
-
+        assertNull( "Value was not returned", timedOut );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index a5fbf6a..af843ad 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -145,7 +145,6 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
             <version>${slf4j.version}</version>
-            <scope>test</scope>
         </dependency>
 
         <!-- common stuff, logging, etc.-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 7d8a859..a7388d6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -60,17 +60,17 @@ public class IndexOperationMessage implements Serializable {
 
 
     public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
-        indexRequests.addAll( indexRequests );
+        this.indexRequests.addAll( indexRequests );
     }
 
 
     public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
-        deIndexRequests.add( deIndexRequest );
+        this.deIndexRequests.add( deIndexRequest );
     }
 
 
     public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
-        deIndexRequests.addAll( deIndexRequests );
+        this.deIndexRequests.addAll( deIndexRequests );
     }
 
 
@@ -96,4 +96,34 @@ public class IndexOperationMessage implements Serializable {
     public BetterFuture<IndexOperationMessage> getFuture() {
         return containerFuture;
     }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final IndexOperationMessage that = ( IndexOperationMessage ) o;
+
+        if ( !deIndexRequests.equals( that.deIndexRequests ) ) {
+            return false;
+        }
+        if ( !indexRequests.equals( that.indexRequests ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = indexRequests.hashCode();
+        result = 31 * result + deIndexRequests.hashCode();
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index 833e045..c6acb36 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -23,56 +23,132 @@ package org.apache.usergrid.persistence.index.impl;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 
+/**
+ * This is experimental at best.  Our SQS size limit is a problem.  We shouldn't use this for index operation. Only for
+ * performing
+ */
 @Singleton
 public class BufferQueueSQSImpl implements BufferQueue {
 
+    private static final Logger logger = LoggerFactory.getLogger( BufferQueueSQSImpl.class );
+
     /** Hacky, copied from CPEntityManager b/c we can't access it here */
     public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
 
 
+    /**
+     * Set our TTL to 1 month.  This is high, but in the event of a bug, we want these entries to get removed
+     */
+    public static final int TTL = 60 * 60 * 24 * 30;
+
+    /**
+     * The name to put in the map
+     */
+    public static final String MAP_NAME = "esqueuedata";
+
+
     private static final String QUEUE_NAME = "es_queue";
 
+    private static SmileFactory SMILE_FACTORY = new SmileFactory();
+
+
+    static {
+        SMILE_FACTORY.delegateToTextual( true );
+    }
+
+
     private final QueueManager queue;
+    private final MapManager mapManager;
     private final IndexFig indexFig;
+    private final ObjectMapper mapper;
+    private final Meter readMeter;
+    private final Timer readTimer;
+    private final Meter writeMeter;
+    private final Timer writeTimer;
 
 
     @Inject
-    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
-        final QueueScope scope =
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig,
+                               final MapManagerFactory mapManagerFactory, final MetricsFactory metricsFactory ) {
+        final QueueScope queueScope =
             new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
 
-        this.queue = queueManagerFactory.getQueueManager( scope );
+        this.queue = queueManagerFactory.getQueueManager( queueScope );
         this.indexFig = indexFig;
+
+        final MapScope scope = new MapScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), MAP_NAME );
+
+        this.mapManager = mapManagerFactory.createMapManager( scope );
+
+
+        this.writeTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "write.timer" );
+        this.writeMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "write.meter" );
+
+        this.readTimer = metricsFactory.getTimer( BufferQueueSQSImpl.class, "read.timer" );
+        this.readMeter = metricsFactory.getMeter( BufferQueueSQSImpl.class, "read.meter" );
+
+        this.mapper = new ObjectMapper( SMILE_FACTORY );
+        //pretty print, disabling for speed
+        //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+
     }
 
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
 
+        final Timer.Context timer = this.writeTimer.time();
+        this.writeMeter.mark();
+
+        final UUID identifier = UUIDGenerator.newTimeUUID();
 
         try {
-            this.queue.sendMessage( operation );
+
+            final String payLoad = toString( operation );
+
+            //write to cassandra
+            this.mapManager.putString( identifier.toString(), payLoad, TTL );
+
+            //signal to SQS
+            this.queue.sendMessage( identifier );
             operation.getFuture().run();
         }
         catch ( IOException e ) {
             throw new RuntimeException( "Unable to queue message", e );
         }
+        finally {
+            timer.stop();
+        }
     }
 
 
@@ -83,23 +159,71 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
         final int actualTake = Math.min( 10, takeSize );
 
-        List<QueueMessage> messages = queue
-            .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
-                IndexOperationMessage.class );
+        final Timer.Context timer = this.readTimer.time();
+
+        try {
 
+            List<QueueMessage> messages = queue
+                .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                    String.class );
 
-        final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
-        for ( final QueueMessage message : messages ) {
 
-            final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
+            final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
-            SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message,  messageBody );
+            final List<String> mapEntries = new ArrayList<>( messages.size() );
 
-            response.add( operation );
-        }
 
-        return response;
+            if(messages.size() == 0){
+                return response;
+            }
+
+            //add all our keys  for a single round trip
+            for ( final QueueMessage message : messages ) {
+                mapEntries.add( message.getBody().toString() );
+            }
+
+            //look up the values
+            final Map<String, String> values = mapManager.getStrings( mapEntries );
+
+
+            //load them into our response
+            for ( final QueueMessage message : messages ) {
+
+                final String key = message.getBody().toString();
+
+                //now see if the key was there
+                final String payload = values.get( key );
+
+                //the entry was not present in cassandra, ignore this message.  Failure should eventually kick it to
+                // a DLQ
+
+                if ( payload == null ) {
+                    continue;
+                }
+
+                final IndexOperationMessage messageBody;
+
+                try {
+                    messageBody = fromString( payload );
+                }
+                catch ( IOException e ) {
+                    logger.error( "Unable to deserialize message from string.  This is a bug", e );
+                    throw new RuntimeException( "Unable to deserialize message from string.  This is a bug", e );
+                }
+
+                SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message, messageBody );
+
+                response.add( operation );
+            }
+
+            readMeter.mark( response.size() );
+            return response;
+        }
+        //stop our timer
+        finally {
+            timer.stop();
+        }
     }
 
 
@@ -107,7 +231,7 @@ public class BufferQueueSQSImpl implements BufferQueue {
     public void ack( final List<IndexOperationMessage> messages ) {
 
         //nothing to do
-        if(messages.size() == 0){
+        if ( messages.size() == 0 ) {
             return;
         }
 
@@ -121,6 +245,19 @@ public class BufferQueueSQSImpl implements BufferQueue {
     }
 
 
+    /** Read the object from Base64 string. */
+    private IndexOperationMessage fromString( String s ) throws IOException {
+        IndexOperationMessage o = mapper.readValue( s, IndexOperationMessage.class );
+        return o;
+    }
+
+
+    /** Write the object to a Base64 string. */
+    private String toString( IndexOperationMessage o ) throws IOException {
+        return mapper.writeValueAsString( o );
+    }
+
+
     /**
      * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index c63c4df..9f3ce66 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -37,9 +37,9 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 @JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public class DeIndexRequest implements BatchRequest {
 
-    public final String[] indexes;
-    public final String entityType;
-    public final String documentId;
+    public String[] indexes;
+    public String entityType;
+    public String documentId;
 
 
     public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
@@ -49,6 +49,10 @@ public class DeIndexRequest implements BatchRequest {
     }
 
 
+    public DeIndexRequest() {
+    }
+
+
     @Override
     public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 8547889..3fc3e77 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -42,7 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
+import rx.Subscription;
 import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
@@ -60,17 +62,23 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     private final IndexFig config;
     private final FailureMonitorImpl failureMonitor;
     private final Client client;
-    private final Observable<List<IndexOperationMessage>> consumer;
+
     private final Timer flushTimer;
     private final Counter indexSizeCounter;
     private final Meter flushMeter;
     private final Timer produceTimer;
     private final BufferQueue bufferQueue;
 
+    //the actively running subscription
+    private Subscription subscription;
+
+    private  Observable<List<IndexOperationMessage>> consumer;
+
+    private Object mutex = new Object();
+
     @Inject
     public EsIndexBufferConsumerImpl( final IndexFig config,  final EsProvider
         provider, final MetricsFactory metricsFactory,   final BufferQueue bufferQueue ){
-        this.bufferQueue = bufferQueue;
         this.flushTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class, "index.buffer.flush");
         this.flushMeter = metricsFactory.getMeter(EsIndexBufferConsumerImpl.class, "index.buffer.meter");
         this.indexSizeCounter =  metricsFactory.getCounter(EsIndexBufferConsumerImpl.class, "index.buffer.size");
@@ -78,81 +86,101 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.failureMonitor = new FailureMonitorImpl(config,provider);
         this.client = provider.getClient();
         this.produceTimer = metricsFactory.getTimer(EsIndexBufferConsumerImpl.class,"index.buffer.consumer.messageFetch");
+        this.bufferQueue = bufferQueue;
+
 
 
-        final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
-            @Override
-            public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
+          start();
+    }
+
+
+    public void start() {
+        synchronized ( mutex) {
 
-                //name our thread so it's easy to see
-                Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+            final AtomicInteger countFail = new AtomicInteger();
 
-                List<IndexOperationMessage> drainList;
-                do {
-                    try {
+            Observable<List<IndexOperationMessage>> consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
+                @Override
+                public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
+                    //name our thread so it's easy to see
+                    Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
 
-                        Timer.Context timer = produceTimer.time();
+                    List<IndexOperationMessage> drainList;
+                    do {
+                        try {
 
-                        drainList = bufferQueue
-                            .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
 
+                            Timer.Context timer = produceTimer.time();
 
-                        subscriber.onNext( drainList );
+                            drainList = bufferQueue.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
+                                TimeUnit.MILLISECONDS );
 
 
-                        timer.stop();
+                            subscriber.onNext( drainList );
 
-                        countFail.set( 0 );
-                    }
-                    catch ( EsRejectedExecutionException err ) {
-                        countFail.incrementAndGet();
-                        log.error(
-                            "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  "
-                                + "Failed {} consecutive times",
-                            config.getFailRefreshCount(), countFail.get() );
-
-                        //es  rejected the exception, sleep and retry in the queue
-                        try {
-                            Thread.sleep( config.getFailureRetryTime() );
+
+                            timer.stop();
+
+                            countFail.set( 0 );
                         }
-                        catch ( InterruptedException e ) {
-                            //swallow
+                        catch ( EsRejectedExecutionException err ) {
+                            countFail.incrementAndGet();
+                            log.error(
+                                "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  " + "Failed {} consecutive times", config.getFailRefreshCount(),
+                                countFail.get() );
+
+                            //es  rejected the exception, sleep and retry in the queue
+                            try {
+                                Thread.sleep( config.getFailureRetryTime() );
+                            }
+                            catch ( InterruptedException e ) {
+                                //swallow
+                            }
                         }
-                    }
-                    catch ( Exception e ) {
-                        int count = countFail.incrementAndGet();
-                        log.error( "failed to dequeue", e );
-                        if ( count > 200 ) {
-                            log.error( "Shutting down index drain due to repetitive failures" );
+                        catch ( Exception e ) {
+                            int count = countFail.incrementAndGet();
+                            log.error( "failed to dequeue", e );
+                            if ( count > 200 ) {
+                                log.error( "Shutting down index drain due to repetitive failures" );
+                            }
                         }
                     }
+                    while ( true );
                 }
-                while ( true );
-            }
-        } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
-            @Override
-            public void call( List<IndexOperationMessage> containerList ) {
-                if ( containerList.size() > 0 ) {
-                    flushMeter.mark( containerList.size() );
-                    Timer.Context time = flushTimer.time();
-                    execute( containerList );
-                    time.stop();
+            } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+                @Override
+                public void call( List<IndexOperationMessage> containerList ) {
+                    if ( containerList.size() > 0 ) {
+                        flushMeter.mark( containerList.size() );
+                        Timer.Context time = flushTimer.time();
+                        execute( containerList );
+                        time.stop();
+                    }
                 }
+            } )
+                //ack after we process
+                .doOnNext( new Action1<List<IndexOperationMessage>>() {
+                    @Override
+                    public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+                        bufferQueue.ack( indexOperationMessages );
+                    }
+                } );
+
+            //start in the background
+            subscription = consumer.subscribe();
+        }
+    }
+
+
+    public void stop() {
+        synchronized ( mutex ) {
+            //stop consuming
+            if(subscription != null) {
+                subscription.unsubscribe();
             }
-} )
-            //ack after we process
-          .doOnNext( new Action1<List<IndexOperationMessage>>() {
-              @Override
-              public void call( final List<IndexOperationMessage> indexOperationMessages ) {
-                  bufferQueue.ack( indexOperationMessages );
-              }
-          } );
-
-        //start in the background
-        consumer.subscribe();
+        }
     }
 
     /**
@@ -165,43 +193,36 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
-            .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
-                @Override
-                public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
-                    final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
-                    final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
-
-                    return Observable.merge( deIndex, index );
-                }
-            } );
-
+        Observable<IndexOperationMessage> flattenMessages = Observable.from( operationMessages );
 
 
         //batch shard operations into a bulk request
-        flattenMessages.toList()
-            .doOnNext(new Action1<List<BatchRequest>>() {
-                @Override
-                public void call(List<BatchRequest> builders) {
-                    try {
-                        final BulkRequestBuilder bulkRequest = initRequest();
-                        for (BatchRequest builder : builders) {
-                            indexSizeCounter.dec();
+        flattenMessages.flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
+            @Override
+            public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage ) {
+                final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests() );
+                final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests() );
 
-                            builder.doOperation( client, bulkRequest );
-                        }
-                        sendRequest(bulkRequest);
-                    }catch (Exception e){
-                        log.error("Failed while sending bulk",e);
-                    }
-                }
-            })
-            .toBlocking().lastOrDefault(null);
+                return Observable.merge( index, deIndex );
+            }
+        } )
+      //collection all the operations into a single stream
+       .collect( initRequest(), new Action2<BulkRequestBuilder, BatchRequest>() {
+           @Override
+           public void call( final BulkRequestBuilder bulkRequestBuilder, final BatchRequest batchRequest ) {
+               batchRequest.doOperation( client, bulkRequestBuilder );
+           }
+       } )
+        //send the request off to ES
+        .doOnNext( new Action1<BulkRequestBuilder>() {
+            @Override
+            public void call( final BulkRequestBuilder bulkRequestBuilder ) {
+                sendRequest( bulkRequestBuilder );
+            }
+        } ).toBlocking().last();
 
         //call back all futures
         Observable.from(operationMessages)
-            .subscribeOn(Schedulers.io())
             .doOnNext(new Action1<IndexOperationMessage>() {
                 @Override
                 public void call(IndexOperationMessage operationMessage) {
@@ -211,6 +232,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .toBlocking().lastOrDefault(null);
     }
 
+
     /**
      * initialize request
      * @return

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 57c2fab..7d7a18d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -38,8 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
-//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
new file mode 100644
index 0000000..4a4672e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImplTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+
+import com.google.inject.Inject;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import static org.junit.Assert.*;
+
+
+
+
+@RunWith(EsRunner.class)
+@UseModules({ TestIndexModule.class })
+@NotThreadSafe
+public class BufferQueueSQSImplTest {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+    @Inject
+    private BufferQueueSQSImpl bufferQueueSQS;
+
+    @Inject
+    private EsIndexBufferConsumerImpl esIndexBufferConsumer;
+
+
+    @Before
+    public void stop() {
+        esIndexBufferConsumer.stop();
+    }
+
+
+    @After
+    public void after() {
+        esIndexBufferConsumer.start();
+    }
+
+
+
+
+    @Test
+    public void testMessageIndexing(){
+
+        final Map<String, Object> request1Data  = new HashMap<String, Object>() {{put("test", "testval1");}};
+        final IndexRequest indexRequest1 =  new IndexRequest( "testAlias1", "testType1", "testDoc1",request1Data );
+
+
+        final Map<String, Object> request2Data  = new HashMap<String, Object>() {{put("test", "testval2");}};
+        final IndexRequest indexRequest2 =  new IndexRequest( "testAlias2", "testType2", "testDoc2",request2Data );
+
+
+        //de-index request
+        final DeIndexRequest deIndexRequest1 = new DeIndexRequest( new String[]{"index1.1, index1.2"}, "testType3", "testId3" );
+
+        final DeIndexRequest deIndexRequest2 = new DeIndexRequest( new String[]{"index2.1", "index2.1"}, "testType4", "testId4" );
+
+
+
+
+        IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+        indexOperationMessage.addIndexRequest( indexRequest1);
+        indexOperationMessage.addIndexRequest( indexRequest2);
+
+        indexOperationMessage.addDeIndexRequest( deIndexRequest1 );
+        indexOperationMessage.addDeIndexRequest( deIndexRequest2 );
+
+        bufferQueueSQS.offer( indexOperationMessage );
+
+        //wait for it to send to SQS
+        indexOperationMessage.getFuture().get();
+
+        //now get it back
+
+        final List<IndexOperationMessage> ops = bufferQueueSQS.take( 10,  20, TimeUnit.SECONDS );
+
+        assertTrue(ops.size() > 1);
+
+        final IndexOperationMessage returnedOperation = ops.get( 0 );
+
+         //get the operations out
+
+        final Set<IndexRequest> indexRequestSet = returnedOperation.getIndexRequests();
+
+        assertTrue(indexRequestSet.contains(indexRequest1));
+        assertTrue(indexRequestSet.contains(indexRequest2));
+
+
+        final Set<DeIndexRequest> deIndexRequests = returnedOperation.getDeIndexRequests();
+
+        assertTrue( deIndexRequests.contains( deIndexRequest1 ) );
+        assertTrue( deIndexRequests.contains( deIndexRequest2 ) );
+
+
+
+        //now ack the message
+
+        bufferQueueSQS.ack( ops );
+
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 215ff57..c5f3488 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
         EsTestUtils.waitForTasks(personLikesIndex);
-        Thread.sleep( 30000 );
+        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
-        Thread.sleep( 30000 );
+        Thread.sleep( 2000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index a15053c..a2135a3 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,12 +24,14 @@ import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.model.field.ArrayField;
 import org.apache.usergrid.persistence.model.field.EntityObjectField;
 import org.apache.usergrid.persistence.model.field.UUIDField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -69,8 +71,14 @@ public class EntityIndexTest extends BaseIT {
     @Inject
     public EntityIndexFactory eif;
 
+    //TODO T.N. Remove this when we move the cursor mapping back to core
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
     @Test
-    public void testIndex() throws IOException {
+    public void testIndex() throws IOException, InterruptedException {
         Id appId = new SimpleId( "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
@@ -86,7 +94,9 @@ public class EntityIndexTest extends BaseIT {
 
         entityIndex.refresh();
 
-        testQueries( indexScope, searchTypes,  entityIndex );
+        Thread.sleep(100000000);
+
+        testQueries( indexScope, searchTypes, entityIndex );
     }
 
     @Test
@@ -660,15 +670,12 @@ public class EntityIndexTest extends BaseIT {
 
         for ( int i = 0; i < size; i++ ) {
             final String middleName = "middleName" + UUIDUtils.newTimeUUID();
-            Map<String, Object> properties = new LinkedHashMap<String, Object>();
-            properties.put( "username", "edanuff" );
-            properties.put( "email", "ed@anuff.com" );
-            properties.put( "middlename", middleName );
 
             Map entityMap = new HashMap() {{
                 put( "username", "edanuff" );
                 put( "email", "ed@anuff.com" );
                 put( "middlename", middleName );
+                put( "created", System.nanoTime() );
             }};
 
             final Id userId = new SimpleId( "user" );
@@ -700,7 +707,7 @@ public class EntityIndexTest extends BaseIT {
 
         for ( int i = 0; i < expectedPages; i++ ) {
             //**
-            final Query query = Query.fromQL( "select *" );
+            final Query query = Query.fromQL( "select * order by created" );
             query.setLimit( limit );
 
             if ( cursor != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8d8eb060/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 10aa621..e2c5c1e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -48,20 +48,19 @@ import java.util.concurrent.TimeUnit;
 public class SQSQueueManagerImpl implements QueueManager {
     private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
-    private  AmazonSQSClient sqs;
-    private  QueueScope scope;
-    private  QueueFig fig;
+    private  final AmazonSQSClient sqs;
+    private  final QueueScope scope;
     private  ObjectMapper mapper;
     private static SmileFactory smileFactory = new SmileFactory();
 
-    private static LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
+    private LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
             .maximumSize(1000)
             .build(new CacheLoader<SqsLoader, Queue>() {
                        @Override
                        public Queue load(SqsLoader queueLoader) throws Exception {
                            Queue queue = null;
                            try {
-                               GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
+                               GetQueueUrlResult result = sqs.getQueueUrl(queueLoader.getKey());
                                queue = new Queue(result.getQueueUrl());
                            } catch (QueueDoesNotExistException queueDoesNotExistException) {
                                queue = null;
@@ -73,7 +72,7 @@ public class SQSQueueManagerImpl implements QueueManager {
                                String name = queueLoader.getKey();
                                CreateQueueRequest createQueueRequest = new CreateQueueRequest()
                                        .withQueueName(name);
-                               CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
+                               CreateQueueResult result = sqs.createQueue(createQueueRequest);
                                String url = result.getQueueUrl();
                                queue = new Queue(url);
                                LOG.info("Created queue with url {}", url);
@@ -85,7 +84,6 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
-        this.fig = fig;
         this.scope = scope;
         try {
             UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
@@ -99,8 +97,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
         } catch ( Exception e ) {
-            LOG.warn("failed to setup SQS",e);
-//            throw new RuntimeException("Error setting up mapper", e);
+            throw new RuntimeException("Error setting up mapper", e);
         }
     }
 
@@ -127,14 +124,14 @@ public class SQSQueueManagerImpl implements QueueManager {
         }
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
-        LOG.info("Getting {} messages from {}", limit, url);
+        LOG.debug( "Getting {} messages from {}", limit, url);
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
         receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
         receiveMessageRequest.setWaitTimeSeconds(waitTime);
         ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
         List<Message> messages = result.getMessages();
-        LOG.info("Received {} messages from {}",messages.size(),url);
+        LOG.debug( "Received {} messages from {}", messages.size(), url);
         List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
         for (Message message : messages) {
             Object body ;
@@ -157,7 +154,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return;
         }
         String url = getQueue().getUrl();
-        LOG.info("Sending Messages...{} to {}", bodies.size(), url);
+        LOG.debug( "Sending Messages...{} to {}", bodies.size(), url);
 
         SendMessageBatchRequest request = new SendMessageBatchRequest(url);
         List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
@@ -180,7 +177,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return;
         }
         String url = getQueue().getUrl();
-        LOG.info("Sending Message...{} to {}",body.toString(),url);
+        LOG.debug( "Sending Message...{} to {}", body.toString(), url);
 
         final String stringBody = toString(body);
 
@@ -192,7 +189,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessage(QueueMessage queueMessage) {
         String url = getQueue().getUrl();
-        LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
+        LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
                 .withQueueUrl(url)
@@ -203,7 +200,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessages(List<QueueMessage> queueMessages) {
         String url = getQueue().getUrl();
-        LOG.info("Commit messages {} to queue {}",queueMessages.size(),url);
+        LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url);
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
         for(QueueMessage message : queueMessages){
             entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
@@ -233,16 +230,11 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     public class SqsLoader {
         private final String key;
-        private final AmazonSQSClient client;
 
         public SqsLoader(String key, AmazonSQSClient client) {
             this.key = key;
-            this.client = client;
         }
 
-        public AmazonSQSClient getClient() {
-            return client;
-        }
 
         public String getKey() {
             return key;


[3/5] incubator-usergrid git commit: Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466

Posted by to...@apache.org.
Merge branch 'USERGRID-466' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-466


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c47f32a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c47f32a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c47f32a7

Branch: refs/heads/USERGRID-466
Commit: c47f32a72d3ab322985e66e893d1622af3be16fc
Parents: 2391974 31bee37
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 16:12:48 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 16:12:48 2015 -0600

----------------------------------------------------------------------
 .../apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java   | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c47f32a7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------


[2/5] incubator-usergrid git commit: Refactored index messages to be serializable.

Posted by to...@apache.org.
Refactored index messages to be serializable.

Consumers now rolls up index requests and flushes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23919745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23919745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23919745

Branch: refs/heads/USERGRID-466
Commit: 23919745883ff9d60c18ce75fc1a056a017bce37
Parents: cd0015d
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 16:12:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 16:12:34 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/queryindex/pom.xml        |   8 +
 .../usergrid/persistence/index/IndexFig.java    |   7 +
 .../index/IndexOperationMessage.java            |  29 +++-
 .../persistence/index/guice/IndexModule.java    |   2 +
 .../persistence/index/impl/BatchRequest.java    |  41 +++++
 .../index/impl/BufferQueueInMemory.java         |  87 ----------
 .../index/impl/BufferQueueInMemoryImpl.java     |  87 ++++++++++
 .../index/impl/BufferQueueSQSImpl.java          | 158 +++++++++++++++++++
 .../persistence/index/impl/DeIndexRequest.java  | 106 +++++++++++++
 .../index/impl/EsEntityIndexBatchImpl.java      |  39 ++---
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  37 ++---
 .../persistence/index/impl/IndexRequest.java    | 117 ++++++++++++++
 .../index/guice/TestIndexModule.java            |   6 +-
 .../persistence/queue/QueueManager.java         |   4 +-
 15 files changed, 581 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index f6ae718..a5fbf6a 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -108,6 +108,14 @@
         </dependency>
 
         <dependency>
+                  <groupId>${project.parent.groupId}</groupId>
+                  <artifactId>queue</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+              </dependency>
+
+
+        <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>${elasticsearch.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index befbaa9..ce14449 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -55,6 +55,8 @@ public interface IndexFig extends GuicyFig {
 
     public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
 
+    public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
     public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
 
     public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -167,4 +169,9 @@ public interface IndexFig extends GuicyFig {
     @Default("1000")
     @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
     long getFailureRetryTime();
+
+    //give us 60 seconds to process the message
+    @Default("60")
+    @Key(INDEX_QUEUE_READ_TIMEOUT)
+    int getIndexQueueTimeout();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 944a71f..43eaa01 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -17,23 +17,28 @@
 package org.apache.usergrid.persistence.index;
 
 import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.BatchRequest;
+
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 
+import java.io.Serializable;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * Container for index operations.
  */
-public  class IndexOperationMessage {
-    private final ConcurrentLinkedQueue<ActionRequestBuilder> builders;
+public  class IndexOperationMessage implements Serializable {
+    private final Set<BatchRequest> builders;
     private final BetterFuture<IndexOperationMessage> containerFuture;
 
     public IndexOperationMessage(){
         final IndexOperationMessage parent = this;
-        builders = new ConcurrentLinkedQueue<>();
+        this.builders = new HashSet<>();
         this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
             @Override
             public IndexOperationMessage call() throws Exception {
@@ -42,7 +47,21 @@ public  class IndexOperationMessage {
         });
     }
 
-    public void addOperation(ActionRequestBuilder builder){
+
+    /**
+     * Add all our operations in the set
+     * @param requests
+     */
+    public void setOperations(final Set<BatchRequest> requests){
+        this.builders.addAll( requests);
+    }
+
+
+    /**
+     * Add the operation to the set
+     * @param builder
+     */
+    public void addOperation(BatchRequest builder){
         builders.add(builder);
     }
 
@@ -50,7 +69,7 @@ public  class IndexOperationMessage {
      * return operations for the message
      * @return
      */
-    public ConcurrentLinkedQueue<ActionRequestBuilder> getOperations(){
+    public Set<BatchRequest> getOperations(){
         return builders;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index d911dab..b03e1c0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
 import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
 
 import org.safehaus.guicyfig.GuicyFigModule;
 
@@ -41,6 +42,7 @@ public abstract class IndexModule extends AbstractModule {
         install(new GuicyFigModule(IndexFig.class));
 
         install(new MapModule());
+        install(new QueueModule());
 
 
         bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
new file mode 100644
index 0000000..df6c5b0
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.Serializable;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * A batch request we can serialize and construct on receive
+ */
+public interface BatchRequest extends Serializable {
+
+
+    /**
+     * Passing the client and the bulk request, add ourselves to the bulk request
+     * @param client
+     * @param bulkRequest
+     */
+    public void doOperation(final Client client, final BulkRequestBuilder bulkRequest );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
deleted file mode 100644
index 403762f..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class BufferQueueInMemory implements BufferQueue {
-
-    private final ArrayBlockingQueue<IndexOperationMessage> messages;
-
-
-    @Inject
-    public BufferQueueInMemory(final IndexFig fig ) {
-        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
-    }
-
-
-    @Override
-    public void offer( final IndexOperationMessage operation ) {
-        messages.offer( operation );
-    }
-
-
-    @Override
-    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
-
-        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
-
-        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
-
-        //loop until we're we're full or we time out
-        do {
-            try {
-                //we received 1, try to drain
-                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
-
-                //drain
-                if ( polled != null ) {
-                    response.add( polled );
-                    messages.drainTo( response, takeSize - response.size() );
-                }
-            }
-            catch ( InterruptedException ie ) {
-                //swallow
-
-            }
-        }
-        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
-
-        return response;
-    }
-
-
-    @Override
-    public void ack( final List<IndexOperationMessage> messages ) {
-         //no op for this
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
new file mode 100644
index 0000000..ef0ef5f
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemoryImpl implements BufferQueue {
+
+    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
+
+    @Inject
+    public BufferQueueInMemoryImpl( final IndexFig fig ) {
+        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        messages.offer( operation );
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+
+        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+        //loop until we're we're full or we time out
+        do {
+            try {
+                //we received 1, try to drain
+                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+                //drain
+                if ( polled != null ) {
+                    response.add( polled );
+                    messages.drainTo( response, takeSize - response.size() );
+                }
+            }
+            catch ( InterruptedException ie ) {
+                //swallow
+
+            }
+        }
+        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+        return response;
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+         //no op for this
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
new file mode 100644
index 0000000..b814603
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.action.ActionRequestBuilder;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueSQSImpl implements BufferQueue {
+
+    /** Hacky, copied from CPEntityManager b/c we can't access it here */
+    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private final QueueManager queue;
+    private final IndexFig indexFig;
+
+
+    @Inject
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
+        final QueueScope scope =
+            new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
+
+        this.queue = queueManagerFactory.getQueueManager( scope );
+        this.indexFig = indexFig;
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        final Message toQueue = new Message( operation.getOperations() );
+
+
+
+
+        try {
+            this.queue.sendMessage( toQueue );
+            operation.getFuture().run();
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        //loop until we're we're full or we time out
+        List<QueueMessage> messages = queue
+            .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                Message.class );
+
+
+        final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
+
+        for ( final QueueMessage message : messages ) {
+
+            SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+
+            operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+
+            response.add( operation );
+        }
+
+        return response;
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+
+        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+        for(IndexOperationMessage ioe: messages){
+            toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+        }
+
+        queue.commitMessages( toAck );
+    }
+
+
+    /**
+     * The message to queue to SQS
+     */
+    public static final class Message implements Serializable {
+        private final Set<BatchRequest> data;
+
+
+        private Message( final Set<BatchRequest> data ) {this.data = data;}
+
+
+        public Set<BatchRequest> getData() {
+            return data;
+        }
+    }
+
+
+    /**
+     * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
+     */
+    public class SqsIndexOperationMessage extends IndexOperationMessage {
+
+        private final QueueMessage message;
+
+
+        public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+
+
+        /**
+         * Get the message from our queue
+         */
+        public QueueMessage getMessage() {
+            return message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
new file mode 100644
index 0000000..a279f16
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Arrays;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class DeIndexRequest implements BatchRequest {
+
+    public final String[] indexes;
+    public final String entityType;
+    public final String documentId;
+
+
+    public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
+        this.indexes = indexes;
+        this.entityType = entityType;
+        this.documentId = documentId;
+    }
+
+
+    @Override
+    public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+
+
+        for(final String index: indexes) {
+            final DeleteRequestBuilder builder = client.prepareDelete( index, entityType, documentId);
+
+            bulkRequest.add( builder );
+        }
+    }
+
+
+    public String[] getIndexes() {
+        return indexes;
+    }
+
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+
+    public String getDocumentId() {
+        return documentId;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final DeIndexRequest that = ( DeIndexRequest ) o;
+
+        if ( !documentId.equals( that.documentId ) ) {
+            return false;
+        }
+        if ( !entityType.equals( that.entityType ) ) {
+            return false;
+        }
+        if ( !Arrays.equals( indexes, that.indexes ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = Arrays.hashCode( indexes );
+        result = 31 * result + entityType.hashCode();
+        result = 31 * result + documentId.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d987b29..b0c731e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -73,10 +73,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private final ApplicationScope applicationScope;
 
-    private final Client client;
-
-    private final boolean refresh;
-
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
@@ -85,21 +81,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final AliasedEntityIndex entityIndex;
     private IndexOperationMessage container;
 
-    private final Timer batchTimer;
 
 
-    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,
+    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope,
                                   final IndexBufferProducer indexBatchBufferProducer,final IndexFig config,
-                                  final AliasedEntityIndex entityIndex,final MetricsFactory metricsFactory ) {
+                                  final AliasedEntityIndex entityIndex ) {
 
         this.applicationScope = applicationScope;
-        this.client = client;
         this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.entityIndex = entityIndex;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
-        this.refresh = config.isForcedRefresh();
-        this.batchTimer = metricsFactory.getTimer( EsEntityIndexBatchImpl.class, "entity.index.batch.timer" );
         //constrained
         this.container = new IndexOperationMessage();
     }
@@ -133,9 +125,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
         final String entityType = entity.getId().getType();
-        IndexRequestBuilder builder =
-                client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
-        container.addOperation(builder);
+
+
+        container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+
         return this;
     }
 
@@ -174,23 +167,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         if(indexes == null ||indexes.length == 0){
             indexes = new String[]{indexIdentifier.getIndex(null)};
         }
-        //get all indexes then flush everyone
-        Timer.Context timeDeindex = batchTimer.time();
-        Observable.from(indexes)
-               .map(new Func1<String, Object>() {
-                   @Override
-                   public Object call(String index) {
-                       try {
-                           DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
-                           container.addOperation(builder);
-                       }catch (Exception e){
-                           log.error("failed to deindex",e);
-                           throw e;
-                       }
-                       return index;
-                   }
-               }).toBlocking().last();
-        timeDeindex.stop();
+
+        container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+
         log.debug("Deindexed Entity with index id " + indexId);
 
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a1a8ca7..7bdb41a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -400,7 +400,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch = new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
+                applicationScope, indexBatchBufferProducer, config, this );
         return batch;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 45c12a1..2342398 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -26,7 +26,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBufferConsumer;
-import org.apache.usergrid.persistence.index.IndexBufferProducer;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.elasticsearch.action.ActionRequestBuilder;
@@ -37,7 +36,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.slf4j.Logger;
@@ -48,7 +46,6 @@ import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +85,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
             @Override
             public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+
+                //name our thread so it's easy to see
+                Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+
                 List<IndexOperationMessage> drainList;
                 do {
                     try {
@@ -130,7 +131,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 }
                 while ( true );
             }
-        } ).subscribeOn( Schedulers.io() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+        } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
             config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
             @Override
             public void call( List<IndexOperationMessage> containerList ) {
@@ -157,37 +158,29 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<ActionRequestBuilder> flattenMessages = Observable.from(operationMessages)
+        Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
             .subscribeOn(Schedulers.io())
-            .flatMap(new Func1<IndexOperationMessage, Observable<ActionRequestBuilder>>() {
+            .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
                 @Override
-                public Observable<ActionRequestBuilder> call(IndexOperationMessage operationMessage) {
-                    return Observable.from(operationMessage.getOperations());
+                public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
+                    return Observable.from( operationMessage.getOperations() );
                 }
-            });
+            } );
 
 
 
         //batch shard operations into a bulk request
         flattenMessages
             .buffer(config.getIndexBatchSize())
-            .doOnNext(new Action1<List<ActionRequestBuilder>>() {
+            .doOnNext(new Action1<List<BatchRequest>>() {
                 @Override
-                public void call(List<ActionRequestBuilder> builders) {
+                public void call(List<BatchRequest> builders) {
                     try {
                         final BulkRequestBuilder bulkRequest = initRequest();
-                        for (ActionRequestBuilder builder : builders) {
+                        for (BatchRequest builder : builders) {
                             indexSizeCounter.dec();
-                            if (builder instanceof IndexRequestBuilder) {
-                                bulkRequest.add((IndexRequestBuilder) builder);
-                            }
-                            if (builder instanceof DeleteRequestBuilder) {
-                                bulkRequest.add((DeleteRequestBuilder) builder);
-                            }
-                            if(builder instanceof DeleteByQueryRequestBuilder){
-                                DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = (DeleteByQueryRequestBuilder) builder;
-                                deleteByQueryRequestBuilder.get();
-                            }
+
+                            builder.doOperation( client, bulkRequest );
                         }
                         sendRequest(bulkRequest);
                     }catch (Exception e){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
new file mode 100644
index 0000000..381d005
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Map;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class IndexRequest implements BatchRequest {
+
+    public final  String writeAlias;
+    public final String entityType;
+    public final String documentId;
+
+    public final Map<String, Object> data;
+
+
+    public IndexRequest( final String writeAlias, final String entityType, final String documentId,
+                         final Map<String, Object> data ) {
+        this.writeAlias = writeAlias;
+        this.entityType = entityType;
+        this.documentId = documentId;
+        this.data = data;
+    }
+
+
+    public void  doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+        IndexRequestBuilder builder =
+                      client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+
+
+        bulkRequest.add( builder );
+
+    }
+
+
+    public String getWriteAlias() {
+        return writeAlias;
+    }
+
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+
+    public String getDocumentId() {
+        return documentId;
+    }
+
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final IndexRequest that = ( IndexRequest ) o;
+
+        if ( !data.equals( that.data ) ) {
+            return false;
+        }
+        if ( !documentId.equals( that.documentId ) ) {
+            return false;
+        }
+        if ( !entityType.equals( that.entityType ) ) {
+            return false;
+        }
+        if ( !writeAlias.equals( that.writeAlias ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = writeAlias.hashCode();
+        result = 31 * result + entityType.hashCode();
+        result = 31 * result + documentId.hashCode();
+        result = 31 * result + data.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7e2312d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -23,7 +23,8 @@ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
 
 
 public class TestIndexModule extends TestModule {
@@ -37,7 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-                bind( BufferQueue.class).to( BufferQueueInMemory.class );
+                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 223860e..dd044d2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -29,8 +29,8 @@ public interface QueueManager {
     /**
      * Read messages from queue
      * @param limit
-     * @param transactionTimeout timeout in ms
-     * @param waitTime wait time for next message in ms
+     * @param transactionTimeout timeout in seconds
+     * @param waitTime wait time for next message in milliseconds
      * @param klass class to cast the return from
      * @return List of Queue Messages
      */


[4/5] incubator-usergrid git commit: Changed consumer structure of buffer/wait timeout. This how happens implicitly in our queue take, and is no longer necessary. .

Posted by to...@apache.org.
Changed consumer structure of buffer/wait timeout.  This how happens implicitly in our queue take, and is no longer necessary.
.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9111d944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9111d944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9111d944

Branch: refs/heads/USERGRID-466
Commit: 9111d94481bc15490b477f9ca48dd2565ca0e9dd
Parents: c47f32a
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 17:25:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 17:25:23 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  9 ++-
 .../usergrid/persistence/index/IndexFig.java    | 47 ++++++-----
 .../index/IndexOperationMessage.java            | 83 ++++++++++++--------
 .../index/impl/BufferQueueInMemoryImpl.java     |  5 +-
 .../index/impl/BufferQueueSQSImpl.java          | 54 +++++--------
 .../persistence/index/impl/DeIndexRequest.java  |  5 ++
 .../index/impl/EsEntityIndexBatchImpl.java      | 12 ++-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 39 +++++----
 .../index/impl/EsIndexBufferProducerImpl.java   |  3 +-
 .../persistence/index/impl/IndexRequest.java    | 24 ++++--
 .../impl/EntityConnectionIndexImplTest.java     |  4 +-
 .../queue/impl/SQSQueueManagerImpl.java         |  8 +-
 12 files changed, 174 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 8d99586..7b53d67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
 import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.slf4j.Logger;
@@ -69,7 +71,12 @@ public class CoreModule  extends AbstractModule {
         install( new CommonModule());
         install(new CollectionModule());
         install(new GraphModule());
-        install(new IndexModule());
+        install( new IndexModule() {
+            @Override
+            public void wireBufferQueue() {
+                bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
+            }
+        } );
         install(new MapModule());
         install(new QueueModule());
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index ce14449..cde86fd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.usergrid.persistence.index;
 
+
 import org.safehaus.guicyfig.Default;
 import org.safehaus.guicyfig.FigSingleton;
 import org.safehaus.guicyfig.GuicyFig;
@@ -55,8 +56,16 @@ public interface IndexFig extends GuicyFig {
 
     public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
 
+    /**
+     * Amount of time to wait when reading from the queue
+     */
     public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
 
+    /**
+     * Amount of time to wait when reading from the queue in milliseconds
+     */
+    public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
+
     public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
 
     public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -67,9 +76,10 @@ public interface IndexFig extends GuicyFig {
     public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
 
     /**
-     *  Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple backpressure
+     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
+     * backpressure
      */
-    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME =  "elasticsearch.rejected_retry_wait";
+    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
 
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
@@ -82,7 +92,7 @@ public interface IndexFig extends GuicyFig {
     int getPort();
 
     @Default( "usergrid" )
-    @Key( ELASTICSEARCH_CLUSTER_NAME)
+    @Key( ELASTICSEARCH_CLUSTER_NAME )
     String getClusterName();
 
     @Default( "usergrid" ) // no underbars allowed
@@ -111,15 +121,15 @@ public interface IndexFig extends GuicyFig {
     public boolean isForcedRefresh();
 
     /** Identify the client node with a unique name. */
-    @Default("default")
+    @Default( "default" )
     @Key( ELASTICSEARCH_NODENAME )
     public String getNodeName();
 
-    @Default("6")
+    @Default( "6" )
     @Key( ELASTICSEARCH_NUMBER_OF_SHARDS )
     public int getNumberOfShards();
 
-    @Default("1")
+    @Default( "1" )
     @Key( ELASTICSEARCH_NUMBER_OF_REPLICAS )
     public int getNumberOfReplicas();
 
@@ -127,51 +137,48 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_FAIL_REFRESH )
     int getFailRefreshCount();
 
-    @Default("2")
+    @Default( "2" )
     int getIndexCacheMaxWorkers();
 
     /**
      * how long to wait before the buffer flushes to send
-     * @return
      */
-    @Default("250")
+    @Default( "250" )
     @Key( INDEX_BUFFER_TIMEOUT )
     long getIndexBufferTimeout();
 
     /**
      * size of the buffer to build up before you send results
-     * @return
      */
-    @Default("1000")
+    @Default( "1000" )
     @Key( INDEX_BUFFER_SIZE )
     int getIndexBufferSize();
 
     /**
      * size of the buffer to build up before you send results
-     * @return
      */
-    @Default("1000")
+    @Default( "1000" )
     @Key( INDEX_QUEUE_SIZE )
     int getIndexQueueSize();
 
     /**
      * Request batch size for ES
-     * @return
      */
-    @Default("1000")
-    @Key( INDEX_BATCH_SIZE)
+    @Default( "1000" )
+    @Key( INDEX_BATCH_SIZE )
     int getIndexBatchSize();
 
-    @Default("one")
+    @Default( "one" )
     @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
     String getWriteConsistencyLevel();
 
-    @Default("1000")
+    @Default( "1000" )
     @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
     long getFailureRetryTime();
 
     //give us 60 seconds to process the message
-    @Default("60")
-    @Key(INDEX_QUEUE_READ_TIMEOUT)
+    @Default( "60" )
+    @Key( INDEX_QUEUE_READ_TIMEOUT )
     int getIndexQueueTimeout();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 43eaa01..7d8a859 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -16,69 +16,84 @@
  */
 package org.apache.usergrid.persistence.index;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.impl.BatchRequest;
-
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 
 import java.io.Serializable;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.DeIndexRequest;
+import org.apache.usergrid.persistence.index.impl.IndexRequest;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 
 /**
  * Container for index operations.
  */
-public  class IndexOperationMessage implements Serializable {
-    private final Set<BatchRequest> builders;
+public class IndexOperationMessage implements Serializable {
+    private final Set<IndexRequest> indexRequests;
+    private final Set<DeIndexRequest> deIndexRequests;
+
+
+
     private final BetterFuture<IndexOperationMessage> containerFuture;
 
-    public IndexOperationMessage(){
+
+    public IndexOperationMessage() {
         final IndexOperationMessage parent = this;
-        this.builders = new HashSet<>();
-        this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
+        this.indexRequests = new HashSet<>();
+        this.deIndexRequests = new HashSet<>();
+        this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() {
             @Override
             public IndexOperationMessage call() throws Exception {
                 return parent;
             }
-        });
+        } );
     }
 
 
-    /**
-     * Add all our operations in the set
-     * @param requests
-     */
-    public void setOperations(final Set<BatchRequest> requests){
-        this.builders.addAll( requests);
+    public void addIndexRequest( final IndexRequest indexRequest ) {
+        indexRequests.add( indexRequest );
     }
 
 
-    /**
-     * Add the operation to the set
-     * @param builder
-     */
-    public void addOperation(BatchRequest builder){
-        builders.add(builder);
+    public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
+        indexRequests.addAll( indexRequests );
     }
 
-    /**
-     * return operations for the message
-     * @return
-     */
-    public Set<BatchRequest> getOperations(){
-        return builders;
+
+    public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
+        deIndexRequests.add( deIndexRequest );
+    }
+
+
+    public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
+        deIndexRequests.addAll( deIndexRequests );
+    }
+
+
+    public Set<IndexRequest> getIndexRequests() {
+        return indexRequests;
+    }
+
+
+    public Set<DeIndexRequest> getDeIndexRequests() {
+        return deIndexRequests;
+    }
+
+
+    @JsonIgnore
+    public boolean isEmpty(){
+        return indexRequests.isEmpty() && deIndexRequests.isEmpty();
     }
 
     /**
      * return the promise
-     * @return
      */
-    public BetterFuture<IndexOperationMessage> getFuture(){
+    @JsonIgnore
+    public BetterFuture<IndexOperationMessage> getFuture() {
         return containerFuture;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index ef0ef5f..1973e5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -60,8 +60,11 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
         //loop until we're we're full or we time out
         do {
             try {
+
+                final long remaining = endTime - System.currentTimeMillis();
+
                 //we received 1, try to drain
-                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+                IndexOperationMessage polled = messages.poll( remaining, timeUnit );
 
                 //drain
                 if ( polled != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index b814603..833e045 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -21,15 +21,11 @@ package org.apache.usergrid.persistence.index.impl;
 
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.elasticsearch.action.ActionRequestBuilder;
-
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -39,7 +35,6 @@ import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
-import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -69,13 +64,10 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
-        final Message toQueue = new Message( operation.getOperations() );
-
-
 
 
         try {
-            this.queue.sendMessage( toQueue );
+            this.queue.sendMessage( operation );
             operation.getFuture().run();
         }
         catch ( IOException e ) {
@@ -87,19 +79,22 @@ public class BufferQueueSQSImpl implements BufferQueue {
     @Override
     public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
 
-        //loop until we're we're full or we time out
+        //SQS doesn't support more than 10
+
+        final int actualTake = Math.min( 10, takeSize );
+
         List<QueueMessage> messages = queue
-            .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
-                Message.class );
+            .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                IndexOperationMessage.class );
 
 
         final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
         for ( final QueueMessage message : messages ) {
 
-            SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+            final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
 
-            operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+            SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message,  messageBody );
 
             response.add( operation );
         }
@@ -111,10 +106,15 @@ public class BufferQueueSQSImpl implements BufferQueue {
     @Override
     public void ack( final List<IndexOperationMessage> messages ) {
 
+        //nothing to do
+        if(messages.size() == 0){
+            return;
+        }
+
         List<QueueMessage> toAck = new ArrayList<>( messages.size() );
 
-        for(IndexOperationMessage ioe: messages){
-            toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+        for ( IndexOperationMessage ioe : messages ) {
+            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
         }
 
         queue.commitMessages( toAck );
@@ -122,22 +122,6 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
 
     /**
-     * The message to queue to SQS
-     */
-    public static final class Message implements Serializable {
-        private final Set<BatchRequest> data;
-
-
-        private Message( final Set<BatchRequest> data ) {this.data = data;}
-
-
-        public Set<BatchRequest> getData() {
-            return data;
-        }
-    }
-
-
-    /**
      * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
      */
     public class SqsIndexOperationMessage extends IndexOperationMessage {
@@ -145,7 +129,11 @@ public class BufferQueueSQSImpl implements BufferQueue {
         private final QueueMessage message;
 
 
-        public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+        public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) {
+            this.message = message;
+            this.addAllDeIndexRequest( source.getDeIndexRequests() );
+            this.addAllIndexRequest( source.getIndexRequests() );
+        }
 
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index a279f16..c63c4df 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -26,10 +26,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
 
 /**
  * Represent the properties required to build an index request
  */
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public class DeIndexRequest implements BatchRequest {
 
     public final String[] indexes;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b0c731e..b63dfe6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         final String entityType = entity.getId().getType();
 
 
-        container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+        container.addIndexRequest(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
 
         return this;
     }
@@ -168,7 +168,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
             indexes = new String[]{indexIdentifier.getIndex(null)};
         }
 
-        container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+        container.addDeIndexRequest( new DeIndexRequest( indexes, entityType, indexId ) );
 
         log.debug("Deindexed Entity with index id " + indexId);
 
@@ -192,6 +192,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public BetterFuture execute() {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
+
+        /**
+         * No-op, just disregard it
+         */
+        if(tempContainer.isEmpty()){
+            return tempContainer.getFuture();
+        }
+
         return indexBatchBufferProducer.put(tempContainer);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 2342398..8547889 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -82,9 +82,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
+        this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
             @Override
-            public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+            public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
                 //name our thread so it's easy to see
                 Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
@@ -99,21 +99,22 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         drainList = bufferQueue
                             .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
 
-                        for ( IndexOperationMessage drained : drainList ) {
-                            subscriber.onNext( drained );
-                        }
 
-                        bufferQueue.ack( drainList );
+                        subscriber.onNext( drainList );
+
 
                         timer.stop();
 
                         countFail.set( 0 );
                     }
-                    catch( EsRejectedExecutionException err)  {
+                    catch ( EsRejectedExecutionException err ) {
                         countFail.incrementAndGet();
-                        log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+                        log.error(
+                            "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  "
+                                + "Failed {} consecutive times",
+                            config.getFailRefreshCount(), countFail.get() );
 
-                       //es  rejected the exception, sleep and retry in the queue
+                        //es  rejected the exception, sleep and retry in the queue
                         try {
                             Thread.sleep( config.getFailureRetryTime() );
                         }
@@ -131,8 +132,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 }
                 while ( true );
             }
-        } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
-            config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+        } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
             @Override
             public void call( List<IndexOperationMessage> containerList ) {
                 if ( containerList.size() > 0 ) {
@@ -142,7 +142,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     time.stop();
                 }
             }
-        } );
+} )
+            //ack after we process
+          .doOnNext( new Action1<List<IndexOperationMessage>>() {
+              @Override
+              public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+                  bufferQueue.ack( indexOperationMessages );
+              }
+          } );
 
         //start in the background
         consumer.subscribe();
@@ -163,15 +170,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
                 @Override
                 public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
-                    return Observable.from( operationMessage.getOperations() );
+                    final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
+                    final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
+
+                    return Observable.merge( deIndex, index );
                 }
             } );
 
 
 
         //batch shard operations into a bulk request
-        flattenMessages
-            .buffer(config.getIndexBatchSize())
+        flattenMessages.toList()
             .doOnNext(new Action1<List<BatchRequest>>() {
                 @Override
                 public void call(List<BatchRequest> builders) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index db1f50e..61d5d25 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -54,7 +54,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
 
     public BetterFuture put(IndexOperationMessage message){
         Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc(message.getOperations().size());
+        indexSizeCounter.inc(message.getDeIndexRequests().size());
+        indexSizeCounter.inc(message.getIndexRequests().size());
         Timer.Context time = timer.time();
         bufferQueue.offer( message );
         time.stop();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
index 381d005..4ec4092 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -26,17 +26,20 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 
 /**
  * Represent the properties required to build an index request
  */
+@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
 public class IndexRequest implements BatchRequest {
 
-    public final  String writeAlias;
-    public final String entityType;
-    public final String documentId;
+    public String writeAlias;
+    public String entityType;
+    public String documentId;
 
-    public final Map<String, Object> data;
+    public Map<String, Object> data;
 
 
     public IndexRequest( final String writeAlias, final String entityType, final String documentId,
@@ -48,13 +51,18 @@ public class IndexRequest implements BatchRequest {
     }
 
 
-    public void  doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
-        IndexRequestBuilder builder =
-                      client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+    /**
+     * DO NOT DELETE!  Required for Jackson
+     */
+    public IndexRequest() {
+    }
 
 
-        bulkRequest.add( builder );
+    public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) {
+        IndexRequestBuilder builder = client.prepareIndex( writeAlias, entityType, documentId ).setSource( data );
 
+
+        bulkRequest.add( builder );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 37b5e90..215ff57 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
         EsTestUtils.waitForTasks(personLikesIndex);
-        Thread.sleep( 1000 );
+        Thread.sleep( 30000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
-        Thread.sleep( 1000 );
+        Thread.sleep( 30000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..10aa621 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -95,7 +95,8 @@ public class SQSQueueManagerImpl implements QueueManager {
             sqs.setRegion(region);
             smileFactory.delegateToTextual(true);
             mapper = new ObjectMapper( smileFactory );
-            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+            //pretty print, disabling for speed
+//            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
         } catch ( Exception e ) {
             LOG.warn("failed to setup SQS",e);
@@ -180,7 +181,10 @@ public class SQSQueueManagerImpl implements QueueManager {
         }
         String url = getQueue().getUrl();
         LOG.info("Sending Message...{} to {}",body.toString(),url);
-        SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+
+        final String stringBody = toString(body);
+
+        SendMessageRequest request = new SendMessageRequest(url, stringBody);
         sqs.sendMessage(request);
     }