You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/03 17:41:11 UTC

[8/8] incubator-usergrid git commit: async delete by query

async delete by query


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/44c09894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/44c09894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/44c09894

Branch: refs/heads/USERGRID-432
Commit: 44c0989422ed0912f3e5ed5643f79910ac1057e6
Parents: 66f4bfb
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Mar 3 09:40:23 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Mar 3 09:40:23 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 21 ++++---
 .../usergrid/corepersistence/CpWalker.java      |  4 +-
 .../corepersistence/StaleIndexCleanupTest.java  | 14 ++---
 .../cassandra/EntityManagerFactoryImplIT.java   | 25 ++++----
 .../usergrid/persistence/index/EntityIndex.java |  6 +-
 .../index/IndexOperationMessage.java            |  7 ++-
 .../index/impl/EsEntityIndexImpl.java           | 65 +++++++++++++++-----
 .../index/impl/EsIndexBufferConsumerImpl.java   | 20 ++++--
 .../persistence/index/impl/EntityIndexTest.java |  2 +-
 stack/pom.xml                                   |  6 ++
 10 files changed, 113 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index bc227d8..6d97605 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -350,7 +350,7 @@ public class CpEntityManager implements EntityManager {
         Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
         CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
-                applicationScope.getApplication(),  entityRef.getType());
+            getApplicationScope().getApplication(),  entityRef.getType());
 
 
         //        if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
@@ -433,7 +433,7 @@ public class CpEntityManager implements EntityManager {
 
 
         CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
-                applicationScope.getApplication(),  type);
+            getApplicationScope().getApplication(),  type);
 
 
         //        if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
@@ -491,11 +491,16 @@ public class CpEntityManager implements EntityManager {
 
     @Override
     public void update( Entity entity ) throws Exception {
-
+        if(entity == null)
+            return;
+        Preconditions.checkNotNull("entity should never be null",entity);
+        String type = entity.getType();
+        Preconditions.checkNotNull("entity type should never be null",type);
+        Id appId  = getApplicationScope().getApplication();
+        Preconditions.checkNotNull("app scope should never be null",appId);
         // first, update entity index in its own collection scope
 
-        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
-                applicationScope.getApplication(),  entity.getType());
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(appId, type );
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
@@ -569,7 +574,7 @@ public class CpEntityManager implements EntityManager {
             return Observable.empty();
         }
         CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
-                applicationScope.getApplication(), entityRef.getType()  );
+            getApplicationScope().getApplication(), entityRef.getType()  );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
@@ -2122,7 +2127,7 @@ public class CpEntityManager implements EntityManager {
                                           final Object propertyValue ) {
 
         CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
-                applicationScope.getApplication(), collectionName);
+            getApplicationScope().getApplication(), collectionName);
 
         final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 
@@ -2446,7 +2451,7 @@ public class CpEntityManager implements EntityManager {
         org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityToCpEntity( entity, importId );
 
         // prepare to write and index Core Persistence Entity into default scope
-        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), eType);
+        CollectionScope collectionScope = getCollectionScopeNameFromEntityType(getApplicationScope().getApplication(), eType);
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 928b210..4b902d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -140,7 +140,9 @@ public class CpWalker {
                                 targetNodeEntityRef.getUuid() );
                             return;
                         }
-
+                        if(entity == null){
+                            return;
+                        }
                         String collName = CpNamingUtils.getCollectionName( edge.getType() );
                         visitor.visitCollectionEntry( em, collName, entity );
                     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 831e2ce..703a497 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -135,6 +135,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
     @Ignore("Broken until search connections is fixed")
     public void testStaleIndexCleanup() throws Exception {
 
+
         logger.info( "Started testStaleIndexCleanup()" );
 
         // turn off post processing stuff that cleans up stale entities
@@ -237,7 +238,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         // query for total number of result candidates = numEntities
         crs = queryCollectionCp( "things", "thing", "select *" );
-        Assert.assertEquals( "Expect stale candidates de-indexed", numEntities, crs.size() );
+        Assert.assertEquals( "Expect stale candidates de-indexed", numEntities, crs.size() );//20,21
     }
 
 
@@ -345,7 +346,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             things.add( em.create("thing", new HashMap<String, Object>() {{
                 put("name", thingName);
             }}));
-            Thread.sleep( writeDelayMs );
+//            Thread.sleep( writeDelayMs );
         }
         em.refreshIndex();
 
@@ -382,14 +383,11 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         // wait for indexes to be cleared for the deleted entities
         count = 0;
         do {
-            Thread.sleep(100);
+            if(count>0){Thread.sleep(200);}
             crs = queryCollectionCp("things", "thing", "select *");
-            em.refreshIndex();
-        } while ( crs.size() > 0 && count++ < 15 );
+        } while ( crs.size() == numEntities && count++ < 15 );
 
-        // query Core Persistence directly for total number of result candidates
-        crs = queryCollectionCp("things", "thing", "select *");
-        Assert.assertEquals( "Expect candidates without earlier stale entities", numEntities, crs.size() );
+        Assert.assertEquals("Expect candidates without earlier stale entities", crs.size(),numEntities);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index b4a67e8..0ca6e8b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -41,6 +41,9 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.setup.ConcurrentProcessSingleton;
 import rx.functions.Func0;
 import rx.functions.Func1;
+import rx.functions.Func2;
+
+import javax.annotation.concurrent.NotThreadSafe;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -49,7 +52,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-
+@NotThreadSafe
 public class EntityManagerFactoryImplIT extends AbstractCoreIT {
 
     private static final Logger logger = LoggerFactory.getLogger( EntityManagerFactoryImplIT.class );
@@ -124,13 +127,13 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
 
         em.refreshIndex();
 
-        Func1< Map<String, UUID> ,Boolean> findDeletedApps = new Func1<Map<String, UUID> ,Boolean>() {
+        Func2<UUID, Map<String, UUID> ,Boolean> findApps = new Func2<UUID,Map<String, UUID> ,Boolean>() {
             @Override
-            public Boolean call( Map<String, UUID> deletedApps) {
+            public Boolean call(UUID applicationId,  Map<String, UUID> apps) {
                 try {
                     boolean found = false;
-                    for (String appName : deletedApps.keySet()) {
-                        UUID appId = deletedApps.get(appName);
+                    for (String appName : apps.keySet()) {
+                        UUID appId = apps.get(appName);
                         if (appId.equals(applicationId)) {
                             found = true;
                             break;
@@ -144,8 +147,8 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
         };
 
         boolean found = false;
-        for(int i=0;i<5;i++){
-            found = findDeletedApps.call(emf.getDeletedApplications());
+        for(int i=0;i<10;i++){
+            found = findApps.call(applicationId,emf.getDeletedApplications());
             if(found){
                 break;
             } else{
@@ -179,8 +182,8 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
         // test to see that app now works and is happy
 
         // it should not be found in the deleted apps collection
-        for(int i=0;i<5;i++){
-            found = findDeletedApps.call(emf.getDeletedApplications());
+        for(int i=0;i<10;i++){
+            found = findApps.call(applicationId,emf.getDeletedApplications());
             if(!found){
                 break;
             } else{
@@ -189,8 +192,8 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
         }
         assertFalse("Restored app found in deleted apps collection", found);
 
-        for(int i=0;i<5;i++){
-            found = findDeletedApps.call(setup.getEmf().getApplications());
+        for(int i=0;i<10;i++){
+            found = findApps.call(applicationId,setup.getEmf().getApplications());
             if(!found){
                 break;
             } else{

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index d239641..b888e09 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -25,8 +25,10 @@ import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.elasticsearch.action.ListenableActionFuture;
 
 import java.util.Map;
+import java.util.concurrent.Future;
 
 
 /**
@@ -76,14 +78,14 @@ public interface EntityIndex {
      * effectively removing all versions of an entity from all index scopes
      * @param entityId The entityId to remove
      */
-    public void deleteAllVersionsOfEntity(final Id entityId );
+    public Future deleteAllVersionsOfEntity(final Id entityId );
 
     /**
      * Takes all the previous versions of the current entity and deletes all previous versions
      * @param id The id to remove
      * @param version The max version to retain
      */
-    public void deletePreviousVersions(final Id id, final UUID version);
+    public Future deletePreviousVersions(final Id id, final UUID version);
 
     /**
      * Refresh the index.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 501233e..944a71f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.persistence.index;
 
 import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 
 import java.util.Iterator;
@@ -27,7 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  * Container for index operations.
  */
 public  class IndexOperationMessage {
-    private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
+    private final ConcurrentLinkedQueue<ActionRequestBuilder> builders;
     private final BetterFuture<IndexOperationMessage> containerFuture;
 
     public IndexOperationMessage(){
@@ -41,7 +42,7 @@ public  class IndexOperationMessage {
         });
     }
 
-    public void addOperation(ShardReplicationOperationRequestBuilder builder){
+    public void addOperation(ActionRequestBuilder builder){
         builders.add(builder);
     }
 
@@ -49,7 +50,7 @@ public  class IndexOperationMessage {
      * return operations for the message
      * @return
      */
-    public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getOperations(){
+    public ConcurrentLinkedQueue<ActionRequestBuilder> getOperations(){
         return builders;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index ad638c6..a8b0cc4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -19,6 +19,8 @@ package org.apache.usergrid.persistence.index.impl;
 
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
@@ -35,6 +37,8 @@ import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -444,7 +448,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
                 try {
                     String[] indexes = ArrayUtils.addAll(
                         getIndexes(AliasType.Read),
-                        getIndexes(AliasType.Write) );
+                        getIndexes(AliasType.Write)
+                    );
 
                     if ( indexes.length == 0 ) {
                         logger.debug( "Not refreshing indexes, none found for app {}",
@@ -510,26 +515,38 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
 
     @Override
-    public void deleteAllVersionsOfEntity( Id entityId ) {
+    public ListenableActionFuture deleteAllVersionsOfEntity(final Id entityId ) {
 
         String idString = IndexingUtils.idString(entityId).toLowerCase();
 
         final TermQueryBuilder tqb = QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, idString );
 
-        final DeleteByQueryResponse response = esProvider.getClient()
-            .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute().actionGet();
+        final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
+            .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
 
+        response.addListener(new ActionListener<DeleteByQueryResponse>() {
+            @Override
+            public void onResponse(DeleteByQueryResponse response) {
+                logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}",
+                    entityId.getType(), entityId.getUuid(), response.status().toString());
+
+                checkDeleteByQueryResponse( tqb, response );
+            }
 
-        logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}",
-            entityId.getType(), entityId.getUuid(), response.status().toString());
+            @Override
+            public void onFailure(Throwable e) {
+                logger.error("Deleted entity {}:{} from all index scopes with error {}",
+                    entityId.getType(), entityId.getUuid(), e);
 
-       checkDeleteByQueryResponse( tqb, response );
 
+            }
+        });
+        return response;
     }
 
 
     @Override
-    public void deletePreviousVersions( final Id entityId, final UUID version ) {
+    public ListenableActionFuture deletePreviousVersions( final Id entityId, final UUID version ) {
 
         String idString = IndexingUtils.idString( entityId ).toLowerCase();
 
@@ -538,15 +555,28 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
                 FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
         );
 
-        final DeleteByQueryResponse response = esProvider.getClient()
-            .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( fqb ).execute().actionGet();
+        final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
+            .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
 
-        //error message needs to be retooled so that it describes the entity more throughly
-        logger.debug( "Deleted entity {}:{} with version {} from all "
-                + "index scopes with response status = {}",
-            entityId.getType(), entityId.getUuid(), version,  response.status().toString()  );
+        response.addListener(new ActionListener<DeleteByQueryResponse>() {
+            @Override
+            public void onResponse(DeleteByQueryResponse response) {
+                //error message needs to be retooled so that it describes the entity more throughly
+                logger.debug( "Deleted entity {}:{} with version {} from all "
+                        + "index scopes with response status = {}",
+                    entityId.getType(), entityId.getUuid(), version,  response.status().toString()  );
+
+                checkDeleteByQueryResponse( fqb, response );
+            }
+
+            @Override
+            public void onFailure(Throwable e) {
+                logger.error("Deleted entity {}:{} from all index scopes with error {}",
+                    entityId.getType(), entityId.getUuid(), e);
+            }
+        });
 
-        checkDeleteByQueryResponse( fqb, response );
+        return response;
     }
 
 
@@ -560,13 +590,14 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
 
             for ( ShardOperationFailedException failedException : failures ) {
-                throw new IndexException( String.format("Unable to delete by query %s. "
+                logger.error( String.format("Unable to delete by query %s. "
                         + "Failed with code %d and reason %s on shard %s in index %s",
                     query.toString(),
                     failedException.status().getStatus(),
                     failedException.reason(),
                     failedException.shardId(),
-                    failedException.index() ) );
+                    failedException.index() )
+                );
             }
 
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 09c7097..fbb7a40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -28,11 +28,13 @@ import org.apache.usergrid.persistence.index.IndexBufferConsumer;
 import org.apache.usergrid.persistence.index.IndexBufferProducer;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
 import org.elasticsearch.client.Client;
@@ -94,24 +96,26 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         }
 
         //process and flatten all the messages to builder requests
-        Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
+        Observable<ActionRequestBuilder> flattenMessages = Observable.from(operationMessages)
             .subscribeOn(Schedulers.io())
-            .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
+            .flatMap(new Func1<IndexOperationMessage, Observable<ActionRequestBuilder>>() {
                 @Override
-                public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
+                public Observable<ActionRequestBuilder> call(IndexOperationMessage operationMessage) {
                     return Observable.from(operationMessage.getOperations());
                 }
             });
 
+
+
         //batch shard operations into a bulk request
         flattenMessages
             .buffer(config.getIndexBatchSize())
-            .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+            .doOnNext(new Action1<List<ActionRequestBuilder>>() {
                 @Override
-                public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+                public void call(List<ActionRequestBuilder> builders) {
                     try {
                         final BulkRequestBuilder bulkRequest = initRequest();
-                        for (ShardReplicationOperationRequestBuilder builder : builders) {
+                        for (ActionRequestBuilder builder : builders) {
                             indexSizeCounter.dec();
                             if (builder instanceof IndexRequestBuilder) {
                                 bulkRequest.add((IndexRequestBuilder) builder);
@@ -119,6 +123,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                             if (builder instanceof DeleteRequestBuilder) {
                                 bulkRequest.add((DeleteRequestBuilder) builder);
                             }
+                            if(builder instanceof DeleteByQueryRequestBuilder){
+                                DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = (DeleteByQueryRequestBuilder) builder;
+                                deleteByQueryRequestBuilder.get();
+                            }
                         }
                         sendRequest(bulkRequest);
                     }catch (Exception e){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 83ba1ec..7bfbb52 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -182,7 +182,7 @@ public class EntityIndexTest extends BaseIT {
     }
 
     @Test
-    public void testDeleteByQueryWithAlias() throws IOException {
+    public void testDeleteWithAlias() throws IOException {
         Id appId = new SimpleId( "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 513d3bb..53755b3 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -1515,6 +1515,12 @@
                               <groupId>org.apache.maven.surefire</groupId>
                               <artifactId>surefire-junit47</artifactId>
                               <version>${surefire.plugin.version}</version>
+                              <exclusions>
+                                  <exclusion>
+                                      <groupId>org.apache.maven.surfire</groupId>
+                                      <artifactId>common-junit3</artifactId>
+                                  </exclusion>
+                              </exclusions>
                           </dependency>
                       </dependencies>
                   </plugin>