You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/17 00:37:36 UTC

[08/50] incubator-usergrid git commit: Refactored index messages to be serializable.

Refactored index messages to be serializable.

Consumers now rolls up index requests and flushes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23919745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23919745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23919745

Branch: refs/heads/two-dot-o
Commit: 23919745883ff9d60c18ce75fc1a056a017bce37
Parents: cd0015d
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 10 16:12:34 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Mar 10 16:12:34 2015 -0600

----------------------------------------------------------------------
 stack/corepersistence/queryindex/pom.xml        |   8 +
 .../usergrid/persistence/index/IndexFig.java    |   7 +
 .../index/IndexOperationMessage.java            |  29 +++-
 .../persistence/index/guice/IndexModule.java    |   2 +
 .../persistence/index/impl/BatchRequest.java    |  41 +++++
 .../index/impl/BufferQueueInMemory.java         |  87 ----------
 .../index/impl/BufferQueueInMemoryImpl.java     |  87 ++++++++++
 .../index/impl/BufferQueueSQSImpl.java          | 158 +++++++++++++++++++
 .../persistence/index/impl/DeIndexRequest.java  | 106 +++++++++++++
 .../index/impl/EsEntityIndexBatchImpl.java      |  39 ++---
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  37 ++---
 .../persistence/index/impl/IndexRequest.java    | 117 ++++++++++++++
 .../index/guice/TestIndexModule.java            |   6 +-
 .../persistence/queue/QueueManager.java         |   4 +-
 15 files changed, 581 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index f6ae718..a5fbf6a 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -108,6 +108,14 @@
         </dependency>
 
         <dependency>
+                  <groupId>${project.parent.groupId}</groupId>
+                  <artifactId>queue</artifactId>
+                  <version>${project.version}</version>
+                  <type>jar</type>
+              </dependency>
+
+
+        <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
             <version>${elasticsearch.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index befbaa9..ce14449 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -55,6 +55,8 @@ public interface IndexFig extends GuicyFig {
 
     public static final String INDEX_BUFFER_TIMEOUT = "elasticsearch.buffer_timeout";
 
+    public static final String INDEX_QUEUE_READ_TIMEOUT = "elasticsearch.queue_read_timeout";
+
     public static final String INDEX_BATCH_SIZE = "elasticsearch.batch_size";
 
     public static final String INDEX_WRITE_CONSISTENCY_LEVEL = "elasticsearch.write_consistency_level";
@@ -167,4 +169,9 @@ public interface IndexFig extends GuicyFig {
     @Default("1000")
     @Key( FAILURE_REJECTED_RETRY_WAIT_TIME )
     long getFailureRetryTime();
+
+    //give us 60 seconds to process the message
+    @Default("60")
+    @Key(INDEX_QUEUE_READ_TIMEOUT)
+    int getIndexQueueTimeout();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 944a71f..43eaa01 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -17,23 +17,28 @@
 package org.apache.usergrid.persistence.index;
 
 import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.apache.usergrid.persistence.index.impl.BatchRequest;
+
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 
+import java.io.Serializable;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * Container for index operations.
  */
-public  class IndexOperationMessage {
-    private final ConcurrentLinkedQueue<ActionRequestBuilder> builders;
+public  class IndexOperationMessage implements Serializable {
+    private final Set<BatchRequest> builders;
     private final BetterFuture<IndexOperationMessage> containerFuture;
 
     public IndexOperationMessage(){
         final IndexOperationMessage parent = this;
-        builders = new ConcurrentLinkedQueue<>();
+        this.builders = new HashSet<>();
         this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
             @Override
             public IndexOperationMessage call() throws Exception {
@@ -42,7 +47,21 @@ public  class IndexOperationMessage {
         });
     }
 
-    public void addOperation(ActionRequestBuilder builder){
+
+    /**
+     * Add all our operations in the set
+     * @param requests
+     */
+    public void setOperations(final Set<BatchRequest> requests){
+        this.builders.addAll( requests);
+    }
+
+
+    /**
+     * Add the operation to the set
+     * @param builder
+     */
+    public void addOperation(BatchRequest builder){
         builders.add(builder);
     }
 
@@ -50,7 +69,7 @@ public  class IndexOperationMessage {
      * return operations for the message
      * @return
      */
-    public ConcurrentLinkedQueue<ActionRequestBuilder> getOperations(){
+    public Set<BatchRequest> getOperations(){
         return builders;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index d911dab..b03e1c0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
 import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
 
 import org.safehaus.guicyfig.GuicyFigModule;
 
@@ -41,6 +42,7 @@ public abstract class IndexModule extends AbstractModule {
         install(new GuicyFigModule(IndexFig.class));
 
         install(new MapModule());
+        install(new QueueModule());
 
 
         bind(EntityIndexFactory.class).to( EsEntityIndexFactoryImpl.class );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
new file mode 100644
index 0000000..df6c5b0
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BatchRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.Serializable;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * A batch request we can serialize and construct on receive
+ */
+public interface BatchRequest extends Serializable {
+
+
+    /**
+     * Passing the client and the bulk request, add ourselves to the bulk request
+     * @param client
+     * @param bulkRequest
+     */
+    public void doOperation(final Client client, final BulkRequestBuilder bulkRequest );
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
deleted file mode 100644
index 403762f..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemory.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-
-@Singleton
-public class BufferQueueInMemory implements BufferQueue {
-
-    private final ArrayBlockingQueue<IndexOperationMessage> messages;
-
-
-    @Inject
-    public BufferQueueInMemory(final IndexFig fig ) {
-        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
-    }
-
-
-    @Override
-    public void offer( final IndexOperationMessage operation ) {
-        messages.offer( operation );
-    }
-
-
-    @Override
-    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
-
-        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
-
-        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
-
-        //loop until we're we're full or we time out
-        do {
-            try {
-                //we received 1, try to drain
-                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
-
-                //drain
-                if ( polled != null ) {
-                    response.add( polled );
-                    messages.drainTo( response, takeSize - response.size() );
-                }
-            }
-            catch ( InterruptedException ie ) {
-                //swallow
-
-            }
-        }
-        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
-
-        return response;
-    }
-
-
-    @Override
-    public void ack( final List<IndexOperationMessage> messages ) {
-         //no op for this
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
new file mode 100644
index 0000000..ef0ef5f
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueInMemoryImpl implements BufferQueue {
+
+    private final ArrayBlockingQueue<IndexOperationMessage> messages;
+
+
+    @Inject
+    public BufferQueueInMemoryImpl( final IndexFig fig ) {
+        messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        messages.offer( operation );
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        final List<IndexOperationMessage> response = new ArrayList<>( takeSize );
+
+        final long endTime = System.currentTimeMillis() + timeUnit.toMillis( timeout );
+
+        //loop until we're we're full or we time out
+        do {
+            try {
+                //we received 1, try to drain
+                IndexOperationMessage polled = messages.poll( timeout, timeUnit );
+
+                //drain
+                if ( polled != null ) {
+                    response.add( polled );
+                    messages.drainTo( response, takeSize - response.size() );
+                }
+            }
+            catch ( InterruptedException ie ) {
+                //swallow
+
+            }
+        }
+        while ( response.size() < takeSize && System.currentTimeMillis() < endTime );
+
+        return response;
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+         //no op for this
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
new file mode 100644
index 0000000..b814603
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueSQSImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.elasticsearch.action.ActionRequestBuilder;
+
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+@Singleton
+public class BufferQueueSQSImpl implements BufferQueue {
+
+    /** Hacky, copied from CPEntityManager b/c we can't access it here */
+    public static final UUID MANAGEMENT_APPLICATION_ID = UUID.fromString( "b6768a08-b5d5-11e3-a495-11ddb1de66c8" );
+
+
+    private static final String QUEUE_NAME = "es_queue";
+
+    private final QueueManager queue;
+    private final IndexFig indexFig;
+
+
+    @Inject
+    public BufferQueueSQSImpl( final QueueManagerFactory queueManagerFactory, final IndexFig indexFig ) {
+        final QueueScope scope =
+            new QueueScopeImpl( new SimpleId( MANAGEMENT_APPLICATION_ID, "application" ), QUEUE_NAME );
+
+        this.queue = queueManagerFactory.getQueueManager( scope );
+        this.indexFig = indexFig;
+    }
+
+
+    @Override
+    public void offer( final IndexOperationMessage operation ) {
+        final Message toQueue = new Message( operation.getOperations() );
+
+
+
+
+        try {
+            this.queue.sendMessage( toQueue );
+            operation.getFuture().run();
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to queue message", e );
+        }
+    }
+
+
+    @Override
+    public List<IndexOperationMessage> take( final int takeSize, final long timeout, final TimeUnit timeUnit ) {
+
+        //loop until we're we're full or we time out
+        List<QueueMessage> messages = queue
+            .getMessages( takeSize, indexFig.getIndexQueueTimeout(), ( int ) timeUnit.toMillis( timeout ),
+                Message.class );
+
+
+        final List<IndexOperationMessage> response = new ArrayList<>( messages.size() );
+
+        for ( final QueueMessage message : messages ) {
+
+            SqsIndexOperationMessage operation = new SqsIndexOperationMessage( message );
+
+            operation.setOperations( ( ( Message ) message.getBody() ).getData() );
+
+            response.add( operation );
+        }
+
+        return response;
+    }
+
+
+    @Override
+    public void ack( final List<IndexOperationMessage> messages ) {
+
+        List<QueueMessage> toAck = new ArrayList<>( messages.size() );
+
+        for(IndexOperationMessage ioe: messages){
+            toAck.add( ((SqsIndexOperationMessage)ioe).getMessage() );
+        }
+
+        queue.commitMessages( toAck );
+    }
+
+
+    /**
+     * The message to queue to SQS
+     */
+    public static final class Message implements Serializable {
+        private final Set<BatchRequest> data;
+
+
+        private Message( final Set<BatchRequest> data ) {this.data = data;}
+
+
+        public Set<BatchRequest> getData() {
+            return data;
+        }
+    }
+
+
+    /**
+     * The message that subclasses our IndexOperationMessage.  holds a pointer to the original message
+     */
+    public class SqsIndexOperationMessage extends IndexOperationMessage {
+
+        private final QueueMessage message;
+
+
+        public SqsIndexOperationMessage( final QueueMessage message ) {this.message = message;}
+
+
+        /**
+         * Get the message from our queue
+         */
+        public QueueMessage getMessage() {
+            return message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
new file mode 100644
index 0000000..a279f16
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/DeIndexRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Arrays;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class DeIndexRequest implements BatchRequest {
+
+    public final String[] indexes;
+    public final String entityType;
+    public final String documentId;
+
+
+    public DeIndexRequest( final String[] indexes, final String entityType, final String documentId) {
+        this.indexes = indexes;
+        this.entityType = entityType;
+        this.documentId = documentId;
+    }
+
+
+    @Override
+    public void doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+
+
+        for(final String index: indexes) {
+            final DeleteRequestBuilder builder = client.prepareDelete( index, entityType, documentId);
+
+            bulkRequest.add( builder );
+        }
+    }
+
+
+    public String[] getIndexes() {
+        return indexes;
+    }
+
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+
+    public String getDocumentId() {
+        return documentId;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final DeIndexRequest that = ( DeIndexRequest ) o;
+
+        if ( !documentId.equals( that.documentId ) ) {
+            return false;
+        }
+        if ( !entityType.equals( that.entityType ) ) {
+            return false;
+        }
+        if ( !Arrays.equals( indexes, that.indexes ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = Arrays.hashCode( indexes );
+        result = 31 * result + entityType.hashCode();
+        result = 31 * result + documentId.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d987b29..b0c731e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -73,10 +73,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private final ApplicationScope applicationScope;
 
-    private final Client client;
-
-    private final boolean refresh;
-
     private final IndexIdentifier.IndexAlias alias;
     private final IndexIdentifier indexIdentifier;
 
@@ -85,21 +81,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     private final AliasedEntityIndex entityIndex;
     private IndexOperationMessage container;
 
-    private final Timer batchTimer;
 
 
-    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,
+    public EsEntityIndexBatchImpl(final ApplicationScope applicationScope,
                                   final IndexBufferProducer indexBatchBufferProducer,final IndexFig config,
-                                  final AliasedEntityIndex entityIndex,final MetricsFactory metricsFactory ) {
+                                  final AliasedEntityIndex entityIndex ) {
 
         this.applicationScope = applicationScope;
-        this.client = client;
         this.indexBatchBufferProducer = indexBatchBufferProducer;
         this.entityIndex = entityIndex;
         this.indexIdentifier = IndexingUtils.createIndexIdentifier(config, applicationScope);
         this.alias = indexIdentifier.getAlias();
-        this.refresh = config.isForcedRefresh();
-        this.batchTimer = metricsFactory.getTimer( EsEntityIndexBatchImpl.class, "entity.index.batch.timer" );
         //constrained
         this.container = new IndexOperationMessage();
     }
@@ -133,9 +125,10 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
         final String entityType = entity.getId().getType();
-        IndexRequestBuilder builder =
-                client.prepareIndex(alias.getWriteAlias(), entityType, indexId).setSource( entityAsMap );
-        container.addOperation(builder);
+
+
+        container.addOperation(new IndexRequest(alias.getWriteAlias(), entityType, indexId, entityAsMap));
+
         return this;
     }
 
@@ -174,23 +167,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         if(indexes == null ||indexes.length == 0){
             indexes = new String[]{indexIdentifier.getIndex(null)};
         }
-        //get all indexes then flush everyone
-        Timer.Context timeDeindex = batchTimer.time();
-        Observable.from(indexes)
-               .map(new Func1<String, Object>() {
-                   @Override
-                   public Object call(String index) {
-                       try {
-                           DeleteRequestBuilder builder = client.prepareDelete(index, entityType, indexId).setRefresh(refresh);
-                           container.addOperation(builder);
-                       }catch (Exception e){
-                           log.error("failed to deindex",e);
-                           throw e;
-                       }
-                       return index;
-                   }
-               }).toBlocking().last();
-        timeDeindex.stop();
+
+        container.addOperation( new DeIndexRequest( indexes, entityType, indexId ) );
+
         log.debug("Deindexed Entity with index id " + indexId);
 
         return this;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a1a8ca7..7bdb41a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -400,7 +400,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
     @Override
     public EntityIndexBatch createBatch() {
         EntityIndexBatch batch = new EsEntityIndexBatchImpl(
-                applicationScope, esProvider.getClient(),indexBatchBufferProducer, config, this, metricsFactory );
+                applicationScope, indexBatchBufferProducer, config, this );
         return batch;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 45c12a1..2342398 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -26,7 +26,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.index.IndexBufferConsumer;
-import org.apache.usergrid.persistence.index.IndexBufferProducer;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
 import org.elasticsearch.action.ActionRequestBuilder;
@@ -37,7 +36,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
 import org.slf4j.Logger;
@@ -48,7 +46,6 @@ import rx.functions.Action1;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -88,6 +85,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         this.consumer = Observable.create( new Observable.OnSubscribe<IndexOperationMessage>() {
             @Override
             public void call( final Subscriber<? super IndexOperationMessage> subscriber ) {
+
+                //name our thread so it's easy to see
+                Thread.currentThread().setName( "QueueConsumer_" + Thread.currentThread().getId() );
+
                 List<IndexOperationMessage> drainList;
                 do {
                     try {
@@ -130,7 +131,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                 }
                 while ( true );
             }
-        } ).subscribeOn( Schedulers.io() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
+        } ).subscribeOn( Schedulers.newThread() ).buffer( config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
             config.getIndexBufferSize() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
             @Override
             public void call( List<IndexOperationMessage> containerList ) {
@@ -157,37 +158,29 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<ActionRequestBuilder> flattenMessages = Observable.from(operationMessages)
+        Observable<BatchRequest> flattenMessages = Observable.from(operationMessages)
             .subscribeOn(Schedulers.io())
-            .flatMap(new Func1<IndexOperationMessage, Observable<ActionRequestBuilder>>() {
+            .flatMap( new Func1<IndexOperationMessage, Observable<BatchRequest>>() {
                 @Override
-                public Observable<ActionRequestBuilder> call(IndexOperationMessage operationMessage) {
-                    return Observable.from(operationMessage.getOperations());
+                public Observable<BatchRequest> call( IndexOperationMessage operationMessage ) {
+                    return Observable.from( operationMessage.getOperations() );
                 }
-            });
+            } );
 
 
 
         //batch shard operations into a bulk request
         flattenMessages
             .buffer(config.getIndexBatchSize())
-            .doOnNext(new Action1<List<ActionRequestBuilder>>() {
+            .doOnNext(new Action1<List<BatchRequest>>() {
                 @Override
-                public void call(List<ActionRequestBuilder> builders) {
+                public void call(List<BatchRequest> builders) {
                     try {
                         final BulkRequestBuilder bulkRequest = initRequest();
-                        for (ActionRequestBuilder builder : builders) {
+                        for (BatchRequest builder : builders) {
                             indexSizeCounter.dec();
-                            if (builder instanceof IndexRequestBuilder) {
-                                bulkRequest.add((IndexRequestBuilder) builder);
-                            }
-                            if (builder instanceof DeleteRequestBuilder) {
-                                bulkRequest.add((DeleteRequestBuilder) builder);
-                            }
-                            if(builder instanceof DeleteByQueryRequestBuilder){
-                                DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = (DeleteByQueryRequestBuilder) builder;
-                                deleteByQueryRequestBuilder.get();
-                            }
+
+                            builder.doOperation( client, bulkRequest );
                         }
                         sendRequest(bulkRequest);
                     }catch (Exception e){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
new file mode 100644
index 0000000..381d005
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRequest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import java.util.Map;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Client;
+
+
+/**
+ * Represent the properties required to build an index request
+ */
+public class IndexRequest implements BatchRequest {
+
+    public final  String writeAlias;
+    public final String entityType;
+    public final String documentId;
+
+    public final Map<String, Object> data;
+
+
+    public IndexRequest( final String writeAlias, final String entityType, final String documentId,
+                         final Map<String, Object> data ) {
+        this.writeAlias = writeAlias;
+        this.entityType = entityType;
+        this.documentId = documentId;
+        this.data = data;
+    }
+
+
+    public void  doOperation(final Client client, final BulkRequestBuilder bulkRequest ){
+        IndexRequestBuilder builder =
+                      client.prepareIndex(writeAlias, entityType, documentId).setSource( data );
+
+
+        bulkRequest.add( builder );
+
+    }
+
+
+    public String getWriteAlias() {
+        return writeAlias;
+    }
+
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+
+    public String getDocumentId() {
+        return documentId;
+    }
+
+
+    public Map<String, Object> getData() {
+        return data;
+    }
+
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() ) {
+            return false;
+        }
+
+        final IndexRequest that = ( IndexRequest ) o;
+
+        if ( !data.equals( that.data ) ) {
+            return false;
+        }
+        if ( !documentId.equals( that.documentId ) ) {
+            return false;
+        }
+        if ( !entityType.equals( that.entityType ) ) {
+            return false;
+        }
+        if ( !writeAlias.equals( that.writeAlias ) ) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        int result = writeAlias.hashCode();
+        result = 31 * result + entityType.hashCode();
+        result = 31 * result + documentId.hashCode();
+        result = 31 * result + data.hashCode();
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 7e2312d..57c2fab 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -23,7 +23,8 @@ import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.index.impl.BufferQueue;
-import org.apache.usergrid.persistence.index.impl.BufferQueueInMemory;
+import org.apache.usergrid.persistence.index.impl.BufferQueueInMemoryImpl;
+import org.apache.usergrid.persistence.index.impl.BufferQueueSQSImpl;
 
 
 public class TestIndexModule extends TestModule {
@@ -37,7 +38,8 @@ public class TestIndexModule extends TestModule {
         install( new IndexModule() {
             @Override
             public void wireBufferQueue() {
-                bind( BufferQueue.class).to( BufferQueueInMemory.class );
+                bind( BufferQueue.class).to( BufferQueueInMemoryImpl.class );
+//                bind( BufferQueue.class).to( BufferQueueSQSImpl.class );
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23919745/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 223860e..dd044d2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -29,8 +29,8 @@ public interface QueueManager {
     /**
      * Read messages from queue
      * @param limit
-     * @param transactionTimeout timeout in ms
-     * @param waitTime wait time for next message in ms
+     * @param transactionTimeout timeout in seconds
+     * @param waitTime wait time for next message in milliseconds
      * @param klass class to cast the return from
      * @return List of Queue Messages
      */