You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/03 17:41:11 UTC
[8/8] incubator-usergrid git commit: async delete by query
async delete by query
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/44c09894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/44c09894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/44c09894
Branch: refs/heads/USERGRID-432
Commit: 44c0989422ed0912f3e5ed5643f79910ac1057e6
Parents: 66f4bfb
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Mar 3 09:40:23 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Mar 3 09:40:23 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 21 ++++---
.../usergrid/corepersistence/CpWalker.java | 4 +-
.../corepersistence/StaleIndexCleanupTest.java | 14 ++---
.../cassandra/EntityManagerFactoryImplIT.java | 25 ++++----
.../usergrid/persistence/index/EntityIndex.java | 6 +-
.../index/IndexOperationMessage.java | 7 ++-
.../index/impl/EsEntityIndexImpl.java | 65 +++++++++++++++-----
.../index/impl/EsIndexBufferConsumerImpl.java | 20 ++++--
.../persistence/index/impl/EntityIndexTest.java | 2 +-
stack/pom.xml | 6 ++
10 files changed, 113 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index bc227d8..6d97605 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -350,7 +350,7 @@ public class CpEntityManager implements EntityManager {
Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
- applicationScope.getApplication(), entityRef.getType());
+ getApplicationScope().getApplication(), entityRef.getType());
// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
@@ -433,7 +433,7 @@ public class CpEntityManager implements EntityManager {
CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
- applicationScope.getApplication(), type);
+ getApplicationScope().getApplication(), type);
// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
@@ -491,11 +491,16 @@ public class CpEntityManager implements EntityManager {
@Override
public void update( Entity entity ) throws Exception {
-
+ if(entity == null)
+ return;
+ Preconditions.checkNotNull("entity should never be null",entity);
+ String type = entity.getType();
+ Preconditions.checkNotNull("entity type should never be null",type);
+ Id appId = getApplicationScope().getApplication();
+ Preconditions.checkNotNull("app scope should never be null",appId);
// first, update entity index in its own collection scope
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
- applicationScope.getApplication(), entity.getType());
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(appId, type );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
@@ -569,7 +574,7 @@ public class CpEntityManager implements EntityManager {
return Observable.empty();
}
CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
- applicationScope.getApplication(), entityRef.getType() );
+ getApplicationScope().getApplication(), entityRef.getType() );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -2122,7 +2127,7 @@ public class CpEntityManager implements EntityManager {
final Object propertyValue ) {
CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
- applicationScope.getApplication(), collectionName);
+ getApplicationScope().getApplication(), collectionName);
final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -2446,7 +2451,7 @@ public class CpEntityManager implements EntityManager {
org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityToCpEntity( entity, importId );
// prepare to write and index Core Persistence Entity into default scope
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), eType);
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(getApplicationScope().getApplication(), eType);
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index 928b210..4b902d8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -140,7 +140,9 @@ public class CpWalker {
targetNodeEntityRef.getUuid() );
return;
}
-
+ if(entity == null){
+ return;
+ }
String collName = CpNamingUtils.getCollectionName( edge.getType() );
visitor.visitCollectionEntry( em, collName, entity );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 831e2ce..703a497 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -135,6 +135,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
@Ignore("Broken until search connections is fixed")
public void testStaleIndexCleanup() throws Exception {
+
logger.info( "Started testStaleIndexCleanup()" );
// turn off post processing stuff that cleans up stale entities
@@ -237,7 +238,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
// query for total number of result candidates = numEntities
crs = queryCollectionCp( "things", "thing", "select *" );
- Assert.assertEquals( "Expect stale candidates de-indexed", numEntities, crs.size() );
+ Assert.assertEquals( "Expect stale candidates de-indexed", numEntities, crs.size() );//20,21
}
@@ -345,7 +346,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
things.add( em.create("thing", new HashMap<String, Object>() {{
put("name", thingName);
}}));
- Thread.sleep( writeDelayMs );
+// Thread.sleep( writeDelayMs );
}
em.refreshIndex();
@@ -382,14 +383,11 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
// wait for indexes to be cleared for the deleted entities
count = 0;
do {
- Thread.sleep(100);
+ if(count>0){Thread.sleep(200);}
crs = queryCollectionCp("things", "thing", "select *");
- em.refreshIndex();
- } while ( crs.size() > 0 && count++ < 15 );
+ } while ( crs.size() == numEntities && count++ < 15 );
- // query Core Persistence directly for total number of result candidates
- crs = queryCollectionCp("things", "thing", "select *");
- Assert.assertEquals( "Expect candidates without earlier stale entities", numEntities, crs.size() );
+ Assert.assertEquals("Expect candidates without earlier stale entities", crs.size(),numEntities);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
index b4a67e8..0ca6e8b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/cassandra/EntityManagerFactoryImplIT.java
@@ -41,6 +41,9 @@ import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.setup.ConcurrentProcessSingleton;
import rx.functions.Func0;
import rx.functions.Func1;
+import rx.functions.Func2;
+
+import javax.annotation.concurrent.NotThreadSafe;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -49,7 +52,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-
+@NotThreadSafe
public class EntityManagerFactoryImplIT extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger( EntityManagerFactoryImplIT.class );
@@ -124,13 +127,13 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
em.refreshIndex();
- Func1< Map<String, UUID> ,Boolean> findDeletedApps = new Func1<Map<String, UUID> ,Boolean>() {
+ Func2<UUID, Map<String, UUID> ,Boolean> findApps = new Func2<UUID,Map<String, UUID> ,Boolean>() {
@Override
- public Boolean call( Map<String, UUID> deletedApps) {
+ public Boolean call(UUID applicationId, Map<String, UUID> apps) {
try {
boolean found = false;
- for (String appName : deletedApps.keySet()) {
- UUID appId = deletedApps.get(appName);
+ for (String appName : apps.keySet()) {
+ UUID appId = apps.get(appName);
if (appId.equals(applicationId)) {
found = true;
break;
@@ -144,8 +147,8 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
};
boolean found = false;
- for(int i=0;i<5;i++){
- found = findDeletedApps.call(emf.getDeletedApplications());
+ for(int i=0;i<10;i++){
+ found = findApps.call(applicationId,emf.getDeletedApplications());
if(found){
break;
} else{
@@ -179,8 +182,8 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
// test to see that app now works and is happy
// it should not be found in the deleted apps collection
- for(int i=0;i<5;i++){
- found = findDeletedApps.call(emf.getDeletedApplications());
+ for(int i=0;i<10;i++){
+ found = findApps.call(applicationId,emf.getDeletedApplications());
if(!found){
break;
} else{
@@ -189,8 +192,8 @@ public class EntityManagerFactoryImplIT extends AbstractCoreIT {
}
assertFalse("Restored app found in deleted apps collection", found);
- for(int i=0;i<5;i++){
- found = findDeletedApps.call(setup.getEmf().getApplications());
+ for(int i=0;i<10;i++){
+ found = findApps.call(applicationId,setup.getEmf().getApplications());
if(!found){
break;
} else{
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index d239641..b888e09 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -25,8 +25,10 @@ import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.elasticsearch.action.ListenableActionFuture;
import java.util.Map;
+import java.util.concurrent.Future;
/**
@@ -76,14 +78,14 @@ public interface EntityIndex {
* effectively removing all versions of an entity from all index scopes
* @param entityId The entityId to remove
*/
- public void deleteAllVersionsOfEntity(final Id entityId );
+ public Future deleteAllVersionsOfEntity(final Id entityId );
/**
* Takes all the previous versions of the current entity and deletes all previous versions
* @param id The id to remove
* @param version The max version to retain
*/
- public void deletePreviousVersions(final Id id, final UUID version);
+ public Future deletePreviousVersions(final Id id, final UUID version);
/**
* Refresh the index.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
index 501233e..944a71f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -17,6 +17,7 @@
package org.apache.usergrid.persistence.index;
import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import java.util.Iterator;
@@ -27,7 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* Container for index operations.
*/
public class IndexOperationMessage {
- private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
+ private final ConcurrentLinkedQueue<ActionRequestBuilder> builders;
private final BetterFuture<IndexOperationMessage> containerFuture;
public IndexOperationMessage(){
@@ -41,7 +42,7 @@ public class IndexOperationMessage {
});
}
- public void addOperation(ShardReplicationOperationRequestBuilder builder){
+ public void addOperation(ActionRequestBuilder builder){
builders.add(builder);
}
@@ -49,7 +50,7 @@ public class IndexOperationMessage {
* return operations for the message
* @return
*/
- public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getOperations(){
+ public ConcurrentLinkedQueue<ActionRequestBuilder> getOperations(){
return builders;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index ad638c6..a8b0cc4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -19,6 +19,8 @@ package org.apache.usergrid.persistence.index.impl;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang.StringUtils;
@@ -35,6 +37,8 @@ import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -444,7 +448,8 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
try {
String[] indexes = ArrayUtils.addAll(
getIndexes(AliasType.Read),
- getIndexes(AliasType.Write) );
+ getIndexes(AliasType.Write)
+ );
if ( indexes.length == 0 ) {
logger.debug( "Not refreshing indexes, none found for app {}",
@@ -510,26 +515,38 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
- public void deleteAllVersionsOfEntity( Id entityId ) {
+ public ListenableActionFuture deleteAllVersionsOfEntity(final Id entityId ) {
String idString = IndexingUtils.idString(entityId).toLowerCase();
final TermQueryBuilder tqb = QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, idString );
- final DeleteByQueryResponse response = esProvider.getClient()
- .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute().actionGet();
+ final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
+ .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
+ response.addListener(new ActionListener<DeleteByQueryResponse>() {
+ @Override
+ public void onResponse(DeleteByQueryResponse response) {
+ logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}",
+ entityId.getType(), entityId.getUuid(), response.status().toString());
+
+ checkDeleteByQueryResponse( tqb, response );
+ }
- logger.debug( "Deleted entity {}:{} from all index scopes with response status = {}",
- entityId.getType(), entityId.getUuid(), response.status().toString());
+ @Override
+ public void onFailure(Throwable e) {
+ logger.error("Deleted entity {}:{} from all index scopes with error {}",
+ entityId.getType(), entityId.getUuid(), e);
- checkDeleteByQueryResponse( tqb, response );
+ }
+ });
+ return response;
}
@Override
- public void deletePreviousVersions( final Id entityId, final UUID version ) {
+ public ListenableActionFuture deletePreviousVersions( final Id entityId, final UUID version ) {
String idString = IndexingUtils.idString( entityId ).toLowerCase();
@@ -538,15 +555,28 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
FilterBuilders.rangeFilter(ENTITY_VERSION_FIELDNAME).lt(version.timestamp())
);
- final DeleteByQueryResponse response = esProvider.getClient()
- .prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( fqb ).execute().actionGet();
+ final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
+ .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(fqb).execute();
- //error message needs to be retooled so that it describes the entity more throughly
- logger.debug( "Deleted entity {}:{} with version {} from all "
- + "index scopes with response status = {}",
- entityId.getType(), entityId.getUuid(), version, response.status().toString() );
+ response.addListener(new ActionListener<DeleteByQueryResponse>() {
+ @Override
+ public void onResponse(DeleteByQueryResponse response) {
+ //error message needs to be retooled so that it describes the entity more throughly
+ logger.debug( "Deleted entity {}:{} with version {} from all "
+ + "index scopes with response status = {}",
+ entityId.getType(), entityId.getUuid(), version, response.status().toString() );
+
+ checkDeleteByQueryResponse( fqb, response );
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ logger.error("Deleted entity {}:{} from all index scopes with error {}",
+ entityId.getType(), entityId.getUuid(), e);
+ }
+ });
- checkDeleteByQueryResponse( fqb, response );
+ return response;
}
@@ -560,13 +590,14 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
for ( ShardOperationFailedException failedException : failures ) {
- throw new IndexException( String.format("Unable to delete by query %s. "
+ logger.error( String.format("Unable to delete by query %s. "
+ "Failed with code %d and reason %s on shard %s in index %s",
query.toString(),
failedException.status().getStatus(),
failedException.reason(),
failedException.shardId(),
- failedException.index() ) );
+ failedException.index() )
+ );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 09c7097..fbb7a40 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -28,11 +28,13 @@ import org.apache.usergrid.persistence.index.IndexBufferConsumer;
import org.apache.usergrid.persistence.index.IndexBufferProducer;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
@@ -94,24 +96,26 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
//process and flatten all the messages to builder requests
- Observable<ShardReplicationOperationRequestBuilder> flattenMessages = Observable.from(operationMessages)
+ Observable<ActionRequestBuilder> flattenMessages = Observable.from(operationMessages)
.subscribeOn(Schedulers.io())
- .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
+ .flatMap(new Func1<IndexOperationMessage, Observable<ActionRequestBuilder>>() {
@Override
- public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage operationMessage) {
+ public Observable<ActionRequestBuilder> call(IndexOperationMessage operationMessage) {
return Observable.from(operationMessage.getOperations());
}
});
+
+
//batch shard operations into a bulk request
flattenMessages
.buffer(config.getIndexBatchSize())
- .doOnNext(new Action1<List<ShardReplicationOperationRequestBuilder>>() {
+ .doOnNext(new Action1<List<ActionRequestBuilder>>() {
@Override
- public void call(List<ShardReplicationOperationRequestBuilder> builders) {
+ public void call(List<ActionRequestBuilder> builders) {
try {
final BulkRequestBuilder bulkRequest = initRequest();
- for (ShardReplicationOperationRequestBuilder builder : builders) {
+ for (ActionRequestBuilder builder : builders) {
indexSizeCounter.dec();
if (builder instanceof IndexRequestBuilder) {
bulkRequest.add((IndexRequestBuilder) builder);
@@ -119,6 +123,10 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
if (builder instanceof DeleteRequestBuilder) {
bulkRequest.add((DeleteRequestBuilder) builder);
}
+ if(builder instanceof DeleteByQueryRequestBuilder){
+ DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = (DeleteByQueryRequestBuilder) builder;
+ deleteByQueryRequestBuilder.get();
+ }
}
sendRequest(bulkRequest);
}catch (Exception e){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 83ba1ec..7bfbb52 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -182,7 +182,7 @@ public class EntityIndexTest extends BaseIT {
}
@Test
- public void testDeleteByQueryWithAlias() throws IOException {
+ public void testDeleteWithAlias() throws IOException {
Id appId = new SimpleId( "application" );
ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/44c09894/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 513d3bb..53755b3 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -1515,6 +1515,12 @@
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${surefire.plugin.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.maven.surfire</groupId>
+ <artifactId>common-junit3</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
</plugin>