You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/20 22:32:59 UTC
[26/31] incubator-usergrid git commit: add delete back
add delete back
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf80b8bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf80b8bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf80b8bb
Branch: refs/heads/USERGRID-486
Commit: cf80b8bb1196d0fedb0d3b7e7b758a4b17113794
Parents: f95a756
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 20 14:55:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 20 14:55:27 2015 -0600
----------------------------------------------------------------------
.../corepersistence/StaleIndexCleanupTest.java | 67 ++++++----------
.../usergrid/persistence/CollectionIT.java | 10 +--
.../PerformanceEntityRebuildIndexTest.java | 71 ++++++++---------
.../index/ApplicationEntityIndex.java | 8 ++
.../impl/EsApplicationEntityIndexImpl.java | 80 ++++++++++++++++++--
.../index/impl/EsEntityIndexBatchImpl.java | 7 +-
.../index/impl/EsEntityIndexImpl.java | 12 ++-
7 files changed, 159 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 6ba1353..e27420b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -94,40 +94,35 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
public void testUpdateVersioning() throws Exception {
// turn off post processing stuff that cleans up stale entities
- System.setProperty( EVENTS_DISABLED, "true" );
+ System.setProperty(EVENTS_DISABLED, "true");
final EntityManager em = app.getEntityManager();
- Entity thing = em.create( "thing", new HashMap<String, Object>() {{
- put( "name", "thing1" );
- }} );
+ Entity thing = em.create("thing", new HashMap<String, Object>() {{
+ put("name", "thing1");
+ }});
app.refreshIndex();
- assertEquals( 1, queryCollectionCp( "things", "thing", "select *" ).size() );
+ assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
- org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity( thing );
+ org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity(thing);
UUID oldVersion = cpEntity.getVersion();
- em.updateProperties( thing, new HashMap<String, Object>() {{
- put( "stuff", "widget" );
- }} );
+ em.updateProperties(thing, new HashMap<String, Object>() {{
+ put("stuff", "widget");
+ }});
app.refreshIndex();
- org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity( thing );
- assertEquals( "widget", cpUpdated.getField( "stuff" ).getValue() );
+ org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
+ assertEquals("widget", cpUpdated.getField("stuff").getValue());
UUID newVersion = cpUpdated.getVersion();
- assertTrue( "New version is greater than old",
- UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
+ assertTrue("New version is greater than old",
+ UUIDComparator.staticCompare(newVersion, oldVersion) > 0);
CandidateResults results;
- do{
- results = queryCollectionCp( "things", "thing", "select *" );
- if(results.size()!=2){
- Thread.sleep(200);
- }
- }while(results.size()!=2);
- assertEquals( 2, results.size() );
+ results = queryCollectionCp("things", "thing", "select *");
+ assertEquals(2, results.size());
}
@@ -160,37 +155,28 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
CandidateResults candidateResults = null;
- do{
- candidateResults = queryCollectionCp("things", "thing", "select * order by ordinal desc");
- if(candidateResults.size()!=2){
- Thread.sleep(200);
- }
- }while(candidateResults.size()<2);
+
+ candidateResults = queryCollectionCp("things", "thing", "select * order by ordinal desc");
+ if(candidateResults.size()!=2){
+ Thread.sleep(200);
+ }
+
assertEquals(2, candidateResults.size());
//now run enable events and ensure we clean up
System.setProperty(EVENTS_DISABLED, "false");
- Results results = null;
- do{
- results = queryCollectionEm("things", "select * order by ordinal desc");;
- if(results.size()!=1){
- Thread.sleep(200);
- }
- }while(results.size()<1);
+ Results results = queryCollectionEm("things", "select * order by ordinal desc");
+
assertEquals( 1, results.size() );
assertEquals(1, results.getEntities().get( 0 ).getProperty( "ordinal" ));
app.refreshIndex();
//ensure it's actually gone
- do{
- candidateResults = queryCollectionCp( "things", "thing", "select * order by ordinal desc" );
- if(candidateResults.size()!=1){
- Thread.sleep(200);
- }
- }while(candidateResults.size()!=1);
+ candidateResults = queryCollectionCp( "things", "thing", "select * order by ordinal desc" );
+
assertEquals(1, candidateResults.size());
assertEquals(newVersion, candidateResults.get(0).getVersion());
@@ -397,7 +383,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
//trigger the repair
results = queryCollectionEm("things", "select *");
crs = queryCollectionCp("things", "thing", "select *");
- Thread.sleep(100);
} while ((results.hasCursor() || crs.size() > 0) && count++ < 2000 );
@@ -462,8 +447,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
// wait for indexes to be cleared for the deleted entities
count = 0;
do {
- queryCollectionEm("dogs", "select *");
- Thread.sleep(100);
crs = queryCollectionCp("dogs", "dog", "select *");
} while ( crs.size() != numEntities && count++ < 15 );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index b1a75d9..3e1db1b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -1231,13 +1231,9 @@ public class CollectionIT extends AbstractCoreIT {
query.addEqualityFilter( "rootprop1", "simpleprop" );
Entity entity;
Results results;
- do {
- results = em.searchCollection(em.getApplicationRef(), "tests", query);
- entity = results.getEntitiesMap().get(saved.getUuid());
- if (entity == null) {
- Thread.sleep(200);
- }
- }while(entity == null);
+ results = em.searchCollection(em.getApplicationRef(), "tests", query);
+ entity = results.getEntitiesMap().get(saved.getUuid());
+
assertNotNull( entity );
// query on the nested int value
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index af31b51..4b284df 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -175,19 +175,19 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- delete the system and application indexes
logger.debug("Deleting app index index");
-// //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
-// deleteIndex( em.getApplicationId() );
-//
-// // ----------------- test that we can read them, should fail
-//
-// logger.debug("Reading data, should fail this time ");
-// try {
-// readData( em, "testTypes", entityCount, 0 );
-// fail("should have failed to read data");
-//
-// } catch (Exception expected) {}
+ //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
+ deleteIndex( em.getApplicationId() );
+
+ // ----------------- test that we can read them, should fail
- // ----------------- rebuild index for catherders only
+ logger.debug("Reading data, should fail this time ");
+ try {
+ readData( em, "testTypes", entityCount, 0 );
+ fail("should have failed to read data");
+
+ } catch (Exception expected) {}
+
+// ----------------- rebuild index for catherders only
logger.debug("Preparing to rebuild all indexes");;
@@ -313,20 +313,20 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
logger.debug("Deleting app index and system app index");
-// deleteIndex( em.getApplicationId() );
+ deleteIndex( em.getApplicationId() );
//
// // deleting sytem app index will interfere with other concurrently running tests
-// //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
+ //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
//
//
// // ----------------- test that we can read them, should fail
//
-// logger.debug("Reading data, should fail this time ");
-// try {
-// readData( em, "testTypes", entityCount, 3 );
-// fail("should have failed to read data");
-//
-// } catch (Exception expected) {}
+ logger.debug("Reading data, should fail this time ");
+ try {
+ readData( em, "testTypes", entityCount, 3 );
+ fail("should have failed to read data");
+
+ } catch (Exception expected) {}
// ----------------- rebuild index
@@ -371,23 +371,26 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- test that we can read them
+ Thread.sleep(2000);
readData( em, "testTypes", entityCount, 3 );
}
-// /**
-// * Delete index for all applications, just need the one to get started.
-// */
-// private void deleteIndex( UUID appUuid ) {
-//
-// Injector injector = SpringResource.getInstance().getBean( Injector.class );
-// EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
-//
-// Id appId = new SimpleId( appUuid, "application");
-// ApplicationScope scope = new ApplicationScopeImpl( appId );
-// ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
-// EsEntityIndexImpl eeii = (EsEntityIndexImpl)ei;
-//
-// }
+ /**
+ * Delete index for all applications, just need the one to get started.
+ */
+ private void deleteIndex( UUID appUuid ) {
+
+ Injector injector = SpringResource.getInstance().getBean( Injector.class );
+ EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
+
+ Id appId = new SimpleId( appUuid, "application");
+ ApplicationScope scope = new ApplicationScopeImpl( appId );
+ ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
+
+ ei.deleteApplication().toBlocking().lastOrDefault(null);
+ app.refreshIndex();
+
+ }
private int readData( EntityManager em,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index fab32b3..34967bd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -19,8 +19,10 @@
*/
package org.apache.usergrid.persistence.index;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
+import org.elasticsearch.action.ListenableActionFuture;
import rx.Observable;
/**
@@ -39,4 +41,10 @@ public interface ApplicationEntityIndex {
* Execute query in Usergrid syntax.
*/
public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, Query query );
+
+ /**
+ * delete all application records
+ * @return
+ */
+ public Observable deleteApplication();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index c83fe41..3633c5b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang3.ArrayUtils;
@@ -62,9 +64,7 @@ import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -89,6 +89,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
private final IndexFig indexFig;
private final EsProvider esProvider;
private final IndexIdentifier.IndexAlias alias;
+ private final Timer deleteApplicationTimer;
+ private final Meter deleteApplicationMeter;
private FailureMonitor failureMonitor;
private final int cursorTimeout;
@Inject
@@ -109,11 +111,16 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
mapManager = mapManagerFactory.createMapManager(mapScope);
this.searchTimer = metricsFactory
- .getTimer(EsEntityIndexImpl.class, "search.timer");
+ .getTimer(EsApplicationEntityIndexImpl.class, "search.timer");
this.cursorTimer = metricsFactory
- .getTimer(EsEntityIndexImpl.class, "search.cursor.timer");
+ .getTimer(EsApplicationEntityIndexImpl.class, "search.cursor.timer");
this.cursorTimeout = config.getQueryCursorTimeout();
+ this.deleteApplicationTimer = metricsFactory
+ .getTimer(EsApplicationEntityIndexImpl.class, "delete.application");
+ this.deleteApplicationMeter = metricsFactory
+ .getMeter(EsApplicationEntityIndexImpl.class, "delete.application.meter");
+
this.alias = indexIdentifier.getAlias();
}
@@ -256,6 +263,69 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
return parseResults(searchResponse, query);
}
+ /**
+ * Completely delete an index.
+ */
+ public Observable deleteApplication() {
+ deleteApplicationMeter.mark();
+ String idString = IndexingUtils.idString(applicationScope.getApplication());
+ final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
+ Set<String> indexSet = new HashSet<>();
+ List<String> reads = Arrays.asList(entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read));
+ List<String> writes = Arrays.asList(entityIndex.getIndexes(AliasedEntityIndex.AliasType.Write));
+ indexSet.addAll(reads);
+ indexSet.addAll(writes);
+ String[] indexes = indexSet.toArray(new String[0]);
+ Timer.Context timer = deleteApplicationTimer.time();
+ //Added For Graphite Metrics
+ return Observable.from(indexes)
+ .flatMap(index -> {
+
+ final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
+ .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(tqb).execute();
+
+ response.addListener(new ActionListener<DeleteByQueryResponse>() {
+
+ @Override
+ public void onResponse(DeleteByQueryResponse response) {
+ checkDeleteByQueryResponse(tqb, response);
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ logger.error("failed on delete index", e);
+ }
+ });
+ return Observable.from(response);
+ })
+ .doOnCompleted(() -> timer.stop());
+ }
+
+ /**
+ * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
+ */
+ private void checkDeleteByQueryResponse(
+ final QueryBuilder query, final DeleteByQueryResponse response ) {
+
+ for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
+ final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
+
+ for ( ShardOperationFailedException failedException : failures ) {
+ logger.error( String.format("Unable to delete by query %s. "
+ + "Failed with code %d and reason %s on shard %s in index %s",
+ query.toString(),
+ failedException.status().getStatus(),
+ failedException.reason(),
+ failedException.shardId(),
+ failedException.index() )
+ );
+ }
+
+ }
+ }
+
+
+
private CandidateResults parseResults( final SearchResponse searchResponse, final Query query ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 38bf381..7b0c3b5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -95,9 +95,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
ValidationUtils.verifyEntityWrite( entity );
ValidationUtils.verifyVersion( entity.getVersion() );
//add app id for indexing
- entity.setField(
- new StringField(APPLICATION_ID_FIELDNAME, IndexingUtils.idString(applicationScope.getApplication()))
- );
+
final String context = createContextName(applicationScope,indexScope);
if ( log.isDebugEnabled() ) {
@@ -110,7 +108,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
ValidationUtils.verifyEntityWrite( entity );
Map<String, Object> entityAsMap = entityToMap( entity, context );
-
+ //add app id
+ entityAsMap.put(APPLICATION_ID_FIELDNAME, idString(applicationScope.getApplication()));
// need prefix here because we index UUIDs as strings
// let caller add these fields if needed
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8bdd663..7029bba 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.*;
/**
@@ -338,10 +339,13 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public boolean doOp() {
try {
- String[] indexes = ArrayUtils.addAll(
- getIndexes(AliasType.Read),
- getIndexes(AliasType.Write)
- );
+
+ Set<String> indexSet = new HashSet<>();
+ List<String> reads = Arrays.asList(getIndexes(AliasType.Read));
+ List<String> writes = Arrays.asList(getIndexes(AliasType.Write));
+ indexSet.addAll(reads);
+ indexSet.addAll(writes);
+ String[] indexes = indexSet.toArray(new String[0]);
if ( indexes.length == 0 ) {
logger.debug( "Not refreshing indexes. none found");