You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:38 UTC

[10/50] incubator-usergrid git commit: Changed consumer structure of buffer/wait timeout. This how happens implicitly in our queue take, and is no longer necessary. .

Changed consumer structure of buffer/wait timeout.  This how happens implicitly in our queue take, and is no longer necessary.
.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9111d944
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9111d944
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9111d944

Branch: refs/heads/two-dot-o
Commit: 9111d94481bc15490b477f9ca48dd2565ca0e9dd
Parents: c47f32a
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 17:25:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 17:25:23 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |  9 ++-
 .../usergrid/persistence/index/IndexFig.java    | 47 ++++++-----
 .../index/IndexOperationMessage.java            | 83 ++++++++++++--------
 .../index/impl/BufferQueueInMemoryImpl.java     |  5 +-
 .../index/impl/BufferQueueSQSImpl.java          | 54 +++++--------
 .../persistence/index/impl/DeIndexRequest.java  |  5 ++
 .../index/impl/EsEntityIndexBatchImpl.java      | 12 ++-
 .../index/impl/EsIndexBufferConsumerImpl.java   | 39 +++++----
 .../index/impl/EsIndexBufferProducerImpl.java   |  3 +-
 .../persistence/index/impl/IndexRequest.java    | 24 ++++--
 .../impl/EntityConnectionIndexImplTest.java     |  4 +-
 .../queue/impl/SQSQueueManagerImpl.java         |  8 +-
 12 files changed, 174 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 8d99586..7b53d67 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
 import org.apache.usergrid.persistence.index.guice.IndexModule;
+import org.apache.usergrid.persistence.index.impl.BufferQueue;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
 import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.slf4j.Logger;
@@ -69,7 +71,12 @@ public class CoreModule  extends AbstractModule {
         install( new CommonModule());
         install(new CollectionModule());
         install(new GraphModule());
-        install(new IndexModule());
+        install( new IndexModule() {
+            @Override
+            public void wireBufferQueue() {
+                bind(BufferQueue.class).to( BufferQueueSQSImpl.class );
+            }
+        } );
         install(new MapModule());
         install(new QueueModule());
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index ce14449..cde86fd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -18,6 +18,7 @@
  */
 package org.apache.usergrid.persistence.index;
 
+
 import org.safehaus.guicyfig.Default;
 import org.safehaus.guicyfig.FigSingleton;
 import org.safehaus.guicyfig.GuicyFig;
@@ -55,8 +56,16 @@ public interface IndexFig extends GuicyFig {
 
     public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
 
+    /**
+     * Amount of time to wait when reading from the queue
+     */
     public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
 
+    /**
+     * Amount of time to wait when reading from the queue in milliseconds
+     */
+    public static final String INDEX_QUEUE_TRANSACTION_TIMEOUT = "elasticsearch.queue_transaction_timeout";
+
     public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
 
     public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -67,9 +76,10 @@ public interface IndexFig extends GuicyFig {
     public static final String ELASTICSEARCH_FAIL_REFRESH = "elasticsearch.fail_refresh";
 
     /**
-     *  Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple backpressure
+     * Amount of time in milliseconds to wait when ES rejects our request before retrying.  Provides simple
+     * backpressure
      */
-    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME =  "elasticsearch.rejected_retry_wait";
+    public static final String FAILURE_REJECTED_RETRY_WAIT_TIME = "elasticsearch.rejected_retry_wait";
 
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
@@ -82,7 +92,7 @@ public interface IndexFig extends GuicyFig {
     int getPort();
 
     @Default( "usergrid" )
-    @Key( ELASTICSEARCH_CLUSTER_NAME)
+    @Key( ELASTICSEARCH_CLUSTER_NAME )
     String getClusterName();
 
     @Default( "usergrid" ) // no underbars allowed
@@ -111,15 +121,15 @@ public interface IndexFig extends GuicyFig {
     public boolean isForcedRefresh();
 
     /** Identify the client node with a unique name. */
-    @Default("default")
+    @Default( "default" )
     @Key( ELASTICSEARCH_NODENAME )
     public String getNodeName();
 
-    @Default("6")
+    @Default( "6" )
     @Key( ELASTICSEARCH_NUMBER_OF_SHARDS )
     public int getNumberOfShards();
 
-    @Default("1")
+    @Default( "1" )
     @Key( ELASTICSEARCH_NUMBER_OF_REPLICAS )
     public int getNumberOfReplicas();
 
@@ -127,51 +137,48 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_FAIL_REFRESH )
     int getFailRefreshCount();
 
-    @Default("2")
+    @Default( "2" )
     int getIndexCacheMaxWorkers();
 
     /**
      * how long to wait before the buffer flushes to send
-     * @return
      */
-    @Default("250")
+    @Default( "250" )
     @Key( INDEX_BUFFER_TIMEOUT )
     long getIndexBufferTimeout();
 
     /**
      * size of the buffer to build up before you send results
-     * @return
      */
-    @Default("1000")
+    @Default( "1000" )
     @Key( INDEX_BUFFER_SIZE )
     int getIndexBufferSize();
 
     /**
      * size of the buffer to build up before you send results
-     * @return
      */
-    @Default("1000")
+    @Default( "1000" )
     @Key( INDEX_QUEUE_SIZE )
     int getIndexQueueSize();
 
     /**
      * Request batch size for ES
-     * @return
      */
-    @Default("1000")
-    @Key( INDEX_BATCH_SIZE)
+    @Default( "1000" )
+    @Key( INDEX_BATCH_SIZE )
     int getIndexBatchSize();
 
-    @Default("one")
+    @Default( "one" )
     @Key( INDEX_WRITE_CONSISTENCY_LEVEL )
     String getWriteConsistencyLevel();
 
-    @Default("1000")
+    @Default( "1000" )
     @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
     long getFailureRetryTime();
 
     //give us 60 seconds to process the message
-    @Default("60")
-    @Key(INDEX_QUEUE_READ_TIMEOUT)
+    @Default( "60" )
+    @Key( INDEX_QUEUE_READ_TIMEOUT )
     int getIndexQueueTimeout();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 43eaa01..7d8a859 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -16,69 +16,84 @@
  */
 package org.apache.usergrid.persistence.index;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.impl.BatchRequest;
-
-import org.elasticsearch.action.ActionRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 
 import java.io.Serializable;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.DeIndexRequest;
+import org.apache.usergrid.persistence.index.impl.IndexRequest;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 
 /**
  * Container for index operations.
  */
-public  class IndexOperationMessage implements Serializable {
-    private final Set<BatchRequest> builders;
+public class IndexOperationMessage implements Serializable {
+    private final Set<IndexRequest> indexRequests;
+    private final Set<DeIndexRequest> deIndexRequests;
+
+
+
     private final BetterFuture<IndexOperationMessage> containerFuture;
 
-    public IndexOperationMessage(){
+
+    public IndexOperationMessage() {
         final IndexOperationMessage parent = this;
-        this.builders = new HashSet<>();
-        this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
+        this.indexRequests = new HashSet<>();
+        this.deIndexRequests = new HashSet<>();
+        this.containerFuture = new BetterFuture<>( new Callable<IndexOperationMessage>() {
             @Override
             public IndexOperationMessage call() throws Exception {
                 return parent;
             }
-        });
+        } );
     }
 
 
-    /**
-     * Add all our operations in the set
-     * @param requests
-     */
-    public void setOperations(final Set<BatchRequest> requests){
-        this.builders.addAll( requests);
+    public void addIndexRequest( final IndexRequest indexRequest ) {
+        indexRequests.add( indexRequest );
     }
 
 
-    /**
-     * Add the operation to the set
-     * @param builder
-     */
-    public void addOperation(BatchRequest builder){
-        builders.add(builder);
+    public void addAllIndexRequest( final Set<IndexRequest> indexRequests ) {
+        indexRequests.addAll( indexRequests );
     }
 
-    /**
-     * return operations for the message
-     * @return
-     */
-    public Set<BatchRequest> getOperations(){
-        return builders;
+
+    public void addDeIndexRequest( final DeIndexRequest deIndexRequest ) {
+        deIndexRequests.add( deIndexRequest );
+    }
+
+
+    public void addAllDeIndexRequest( final Set<DeIndexRequest> deIndexRequests ) {
+        deIndexRequests.addAll( deIndexRequests );
+    }
+
+
+    public Set<IndexRequest> getIndexRequests() {
+        return indexRequests;
+    }
+
+
+    public Set<DeIndexRequest> getDeIndexRequests() {
+        return deIndexRequests;
+    }
+
+
+    @JsonIgnore
+    public boolean isEmpty(){
+        return indexRequests.isEmpty() && deIndexRequests.isEmpty();
     }
 
     /**
      * return the promise
-     * @return
      */
-    public BetterFuture<IndexOperationMessage> getFuture(){
+    @JsonIgnore
+    public BetterFuture<IndexOperationMessage> getFuture() {
         return containerFuture;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index ef0ef5f..1973e5d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -60,8 +60,11 @@ public class BufferQueueInMemoryImpl implements BufferQueue {
         //loop until we're we're full or we time out
         do {
             try {
+
+                final long remaining = endTime - System.currentTimeMillis();
+
                 //we received 1, try to drain
-                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+                IndexOperationMessage polled = messages.poll( remaining, timeUnit );
 
                 //drain
                 if ( polled != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
index b814603..833e045 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -21,15 +21,11 @@ package org.apache.usergrid.persistence.index.impl;
 
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.elasticsearch.action.ActionRequestBuilder;
-
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -39,7 +35,6 @@ import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
-import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -69,13 +64,10 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
-        final Message toQueue = new Message( operation.getOperations() );
-
-
 
 
         try {
-            this.queue.sendMessage( toQueue );
+            this.queue.sendMessage( operation );
             operation.getFuture().run();
         }
         catch ( IOException e ) {
@@ -87,19 +79,22 @@ public class BufferQueueSQSImpl implements BufferQueue {
     @Override
     public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
 
-        //loop until we're we're full or we time out
+        //SQS doesn't support more than 10
+
+        final int actualTake = Math.min( 10, takeSize );
+
         List<QueueMessage> messages = queue
-            .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
-                Message.class );
+            .getMessages( actualTake, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                IndexOperationMessage.class );
 
 
         final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
 
         for ( final QueueMessage message : messages ) {
 
-            SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+            final IndexOperationMessage messageBody = ( IndexOperationMessage ) message.getBody();
 
-            operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+            SqsIndexOperationMessage operation = new SqsIndexOperationMessage(message,  messageBody );
 
             response.add( operation );
         }
@@ -111,10 +106,15 @@ public class BufferQueueSQSImpl implements BufferQueue {
     @Override
     public void ack( final List<IndexOperationMessage> messages ) {
 
+        //nothing to do
+        if(messages.size() == 0){
+            return;
+        }
+
         List<QueueMessage> toAck = new ArrayList<>( messages.size() );
 
-        for(IndexOperationMessage ioe: messages){
-            toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+        for ( IndexOperationMessage ioe : messages ) {
+            toAck.add( ( ( SqsIndexOperationMessage ) ioe ).getMessage() );
         }
 
         queue.commitMessages( toAck );
@@ -122,22 +122,6 @@ public class BufferQueueSQSImpl implements BufferQueue {
 
 
     /**
-     * The message to queue to SQS
-     */
-    public static final class Message implements Serializable {
-        private final Set<BatchRequest> data;
-
-
-        private Message( final Set<BatchRequest> data ) {this.data = data;}
-
-
-        public Set<BatchRequest> getData() {
-            return data;
-        }
-    }
-
-
-    /**
      * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
      */
     public class SqsIndexOperationMessage extends IndexOperationMessage {
@@ -145,7 +129,11 @@ public class BufferQueueSQSImpl implements BufferQueue {
         private final QueueMessage message;
 
 
-        public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+        public SqsIndexOperationMessage( final QueueMessage message, final IndexOperationMessage source ) {
+            this.message = message;
+            this.addAllDeIndexRequest( source.getDeIndexRequests() );
+            this.addAllIndexRequest( source.getIndexRequests() );
+        }
 
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
index a279f16..c63c4df 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -26,10 +26,15 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
 
 /**
  * Represent the properties required to build an index request
  */
+@JsonTypeInfo(use=JsonTypeInfo.Id.CLASS, include=JsonTypeInfo.As.PROPERTY, property="@class")
 public class DeIndexRequest implements BatchRequest {
 
     public final String[] indexes;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b0c731e..b63dfe6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -127,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         final String entityType = entity.getId().getType();
 
 
-        container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+        container.addIndexRequest(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
 
         return this;
     }
@@ -168,7 +168,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
             indexes = new String[]{indexIdentifier.getIndex(null)};
         }
 
-        container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+        container.addDeIndexRequest( new DeIndexRequest( indexes, entityType, indexId ) );
 
         log.debug("Deindexed Entity with index id " + indexId);
 
@@ -192,6 +192,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     public BetterFuture execute() {
         IndexOperationMessage tempContainer = container;
         container = new IndexOperationMessage();
+
+        /**
+         * No-op, just disregard it
+         */
+        if(tempContainer.isEmpty()){
+            return tempContainer.getFuture();
+        }
+
         return indexBatchBufferProducer.put(tempContainer);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 2342398..8547889 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -82,9 +82,9 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         final AtomicInteger countFail = new AtomicInteger();
         //batch up sets of some size and send them in batch
-        this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
+        this.consumer = Observable.create( new Observable.OnSubscribe<List<IndexOperationMessage>>() {
             @Override
-            public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+            public void call( final Subscriber<? super List<IndexOperationMessage>> subscriber ) {
 
                 //name our thread so it's easy to see
                 Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
@@ -99,21 +99,22 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         drainList = bufferQueue
                             .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS );
 
-                        for ( IndexOperationMessage drained : drainList ) {
-                            subscriber.onNext( drained );
-                        }
 
-                        bufferQueue.ack( drainList );
+                        subscriber.onNext( drainList );
+
 
                         timer.stop();
 
                         countFail.set( 0 );
                     }
-                    catch( EsRejectedExecutionException err)  {
+                    catch ( EsRejectedExecutionException err ) {
                         countFail.incrementAndGet();
-                        log.error( "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  Failed {} consecutive times", config.getFailRefreshCount(), countFail.get() );
+                        log.error(
+                            "Elasticsearch rejected our request, sleeping for {} milliseconds before retrying.  "
+                                + "Failed {} consecutive times",
+                            config.getFailRefreshCount(), countFail.get() );
 
-                       //es  rejected the exception, sleep and retry in the queue
+                        //es  rejected the exception, sleep and retry in the queue
                         try {
                             Thread.sleep( config.getFailureRetryTime() );
                         }
@@ -131,8 +132,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 }
                 while ( true );
             }
-        } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
-            config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+        } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
             @Override
             public void call( List<IndexOperationMessage> containerList ) {
                 if ( containerList.size() > 0 ) {
@@ -142,7 +142,14 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     time.stop();
                 }
             }
-        } );
+} )
+            //ack after we process
+          .doOnNext( new Action1<List<IndexOperationMessage>>() {
+              @Override
+              public void call( final List<IndexOperationMessage> indexOperationMessages ) {
+                  bufferQueue.ack( indexOperationMessages );
+              }
+          } );
 
         //start in the background
         consumer.subscribe();
@@ -163,15 +170,17 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
             .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
                 @Override
                 public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
-                    return Observable.from( operationMessage.getOperations() );
+                    final Observable<DeIndexRequest> deIndex = Observable.from( operationMessage.getDeIndexRequests () );
+                    final Observable<IndexRequest> index = Observable.from( operationMessage.getIndexRequests() );
+
+                    return Observable.merge( deIndex, index );
                 }
             } );
 
 
 
         //batch shard operations into a bulk request
-        flattenMessages
-            .buffer(config.getIndexBatchSize())
+        flattenMessages.toList()
             .doOnNext(new Action1<List<BatchRequest>>() {
                 @Override
                 public void call(List<BatchRequest> builders) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
index db1f50e..61d5d25 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferProducerImpl.java
@@ -54,7 +54,8 @@ public class EsIndexBufferProducerImpl implements IndexBufferProducer {
 
     public BetterFuture put(IndexOperationMessage message){
         Preconditions.checkNotNull(message, "Message cannot be null");
-        indexSizeCounter.inc(message.getOperations().size());
+        indexSizeCounter.inc(message.getDeIndexRequests().size());
+        indexSizeCounter.inc(message.getIndexRequests().size());
         Timer.Context time = timer.time();
         bufferQueue.offer( message );
         time.stop();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
index 381d005..4ec4092 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -26,17 +26,20 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 
 /**
  * Represent the properties required to build an index request
  */
+@JsonTypeInfo( use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "@class" )
 public class IndexRequest implements BatchRequest {
 
-    public final  String writeAlias;
-    public final String entityType;
-    public final String documentId;
+    public String writeAlias;
+    public String entityType;
+    public String documentId;
 
-    public final Map<String, Object> data;
+    public Map<String, Object> data;
 
 
     public IndexRequest( final String writeAlias, final String entityType, final String documentId,
@@ -48,13 +51,18 @@ public class IndexRequest implements BatchRequest {
     }
 
 
-    public void  doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
-        IndexRequestBuilder builder =
-                      client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+    /**
+     * DO NOT DELETE!  Required for Jackson
+     */
+    public IndexRequest() {
+    }
 
 
-        bulkRequest.add( builder );
+    public void doOperation( final Client client, final BulkRequestBuilder bulkRequest ) {
+        IndexRequestBuilder builder = client.prepareIndex( writeAlias, entityType, documentId ).setSource( data );
 
+
+        bulkRequest.add( builder );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 37b5e90..215ff57 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -141,7 +141,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
         EsTestUtils.waitForTasks(personLikesIndex);
-        Thread.sleep( 1000 );
+        Thread.sleep( 30000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex
@@ -271,7 +271,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
-        Thread.sleep( 1000 );
+        Thread.sleep( 30000 );
 
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9111d944/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..10aa621 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -95,7 +95,8 @@ public class SQSQueueManagerImpl implements QueueManager {
             sqs.setRegion(region);
             smileFactory.delegateToTextual(true);
             mapper = new ObjectMapper( smileFactory );
-            mapper.enable(SerializationFeature.INDENT_OUTPUT);
+            //pretty print, disabling for speed
+//            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
         } catch ( Exception e ) {
             LOG.warn("failed to setup SQS",e);
@@ -180,7 +181,10 @@ public class SQSQueueManagerImpl implements QueueManager {
         }
         String url = getQueue().getUrl();
         LOG.info("Sending Message...{} to {}",body.toString(),url);
-        SendMessageRequest request = new SendMessageRequest(url,toString((Serializable)body));
+
+        final String stringBody = toString(body);
+
+        SendMessageRequest request = new SendMessageRequest(url, stringBody);
         sqs.sendMessage(request);
     }