You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/20 22:32:59 UTC

[26/31] incubator-usergrid git commit: add delete back

add delete back


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf80b8bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf80b8bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf80b8bb

Branch: refs/heads/USERGRID-486
Commit: cf80b8bb1196d0fedb0d3b7e7b758a4b17113794
Parents: f95a756
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Mar 20 14:55:27 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Mar 20 14:55:27 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/StaleIndexCleanupTest.java  | 67 ++++++----------
 .../usergrid/persistence/CollectionIT.java      | 10 +--
 .../PerformanceEntityRebuildIndexTest.java      | 71 ++++++++---------
 .../index/ApplicationEntityIndex.java           |  8 ++
 .../impl/EsApplicationEntityIndexImpl.java      | 80 ++++++++++++++++++--
 .../index/impl/EsEntityIndexBatchImpl.java      |  7 +-
 .../index/impl/EsEntityIndexImpl.java           | 12 ++-
 7 files changed, 159 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 6ba1353..e27420b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -94,40 +94,35 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
     public void testUpdateVersioning() throws Exception {
 
         // turn off post processing stuff that cleans up stale entities
-        System.setProperty( EVENTS_DISABLED, "true" );
+        System.setProperty(EVENTS_DISABLED, "true");
 
         final EntityManager em = app.getEntityManager();
 
-        Entity thing = em.create( "thing", new HashMap<String, Object>() {{
-            put( "name", "thing1" );
-        }} );
+        Entity thing = em.create("thing", new HashMap<String, Object>() {{
+            put("name", "thing1");
+        }});
         app.refreshIndex();
 
-        assertEquals( 1, queryCollectionCp( "things", "thing", "select *" ).size() );
+        assertEquals(1, queryCollectionCp("things", "thing", "select *").size());
 
-        org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity( thing );
+        org.apache.usergrid.persistence.model.entity.Entity cpEntity = getCpEntity(thing);
         UUID oldVersion = cpEntity.getVersion();
 
-        em.updateProperties( thing, new HashMap<String, Object>() {{
-            put( "stuff", "widget" );
-        }} );
+        em.updateProperties(thing, new HashMap<String, Object>() {{
+            put("stuff", "widget");
+        }});
         app.refreshIndex();
 
-        org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity( thing );
-        assertEquals( "widget", cpUpdated.getField( "stuff" ).getValue() );
+        org.apache.usergrid.persistence.model.entity.Entity cpUpdated = getCpEntity(thing);
+        assertEquals("widget", cpUpdated.getField("stuff").getValue());
         UUID newVersion = cpUpdated.getVersion();
 
-        assertTrue( "New version is greater than old",
-                UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
+        assertTrue("New version is greater than old",
+            UUIDComparator.staticCompare(newVersion, oldVersion) > 0);
 
         CandidateResults results;
-        do{
-             results = queryCollectionCp( "things", "thing", "select *" );
-            if(results.size()!=2){
-                Thread.sleep(200);
-            }
-        }while(results.size()!=2);
-        assertEquals( 2, results.size() );
+        results = queryCollectionCp("things", "thing", "select *");
+        assertEquals(2, results.size());
     }
 
 
@@ -160,37 +155,28 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         CandidateResults candidateResults = null;
 
-        do{
-            candidateResults = queryCollectionCp("things", "thing", "select * order by ordinal desc");
-            if(candidateResults.size()!=2){
-                Thread.sleep(200);
-            }
-        }while(candidateResults.size()<2);
+
+        candidateResults = queryCollectionCp("things", "thing", "select * order by ordinal desc");
+        if(candidateResults.size()!=2){
+            Thread.sleep(200);
+        }
+
 
         assertEquals(2, candidateResults.size());
 
         //now run enable events and ensure we clean up
         System.setProperty(EVENTS_DISABLED, "false");
 
-        Results results = null;
-        do{
-            results =  queryCollectionEm("things", "select * order by ordinal desc");;
-            if(results.size()!=1){
-                Thread.sleep(200);
-            }
-        }while(results.size()<1);
+        Results results =  queryCollectionEm("things", "select * order by ordinal desc");
+
         assertEquals( 1, results.size() );
         assertEquals(1, results.getEntities().get( 0 ).getProperty( "ordinal" ));
 
         app.refreshIndex();
 
         //ensure it's actually gone
-        do{
-            candidateResults = queryCollectionCp( "things", "thing", "select * order by ordinal desc" );
-            if(candidateResults.size()!=1){
-                Thread.sleep(200);
-            }
-        }while(candidateResults.size()!=1);
+        candidateResults = queryCollectionCp( "things", "thing", "select * order by ordinal desc" );
+
         assertEquals(1, candidateResults.size());
 
         assertEquals(newVersion, candidateResults.get(0).getVersion());
@@ -397,7 +383,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             //trigger the repair
             results = queryCollectionEm("things", "select *");
             crs = queryCollectionCp("things", "thing", "select *");
-            Thread.sleep(100);
 
         } while ((results.hasCursor() || crs.size() > 0) && count++ < 2000 );
 
@@ -462,8 +447,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         // wait for indexes to be cleared for the deleted entities
         count = 0;
         do {
-            queryCollectionEm("dogs", "select *");
-            Thread.sleep(100);
             crs = queryCollectionCp("dogs", "dog", "select *");
         } while ( crs.size() != numEntities && count++ < 15 );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index b1a75d9..3e1db1b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -1231,13 +1231,9 @@ public class CollectionIT extends AbstractCoreIT {
         query.addEqualityFilter( "rootprop1", "simpleprop" );
         Entity entity;
         Results results;
-        do {
-            results = em.searchCollection(em.getApplicationRef(), "tests", query);
-            entity = results.getEntitiesMap().get(saved.getUuid());
-            if (entity == null) {
-                Thread.sleep(200);
-            }
-        }while(entity == null);
+        results = em.searchCollection(em.getApplicationRef(), "tests", query);
+        entity = results.getEntitiesMap().get(saved.getUuid());
+
         assertNotNull( entity );
 
         // query on the nested int value

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index af31b51..4b284df 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -175,19 +175,19 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         // ----------------- delete the system and application indexes
 
         logger.debug("Deleting app index index");
-//        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
-//        deleteIndex( em.getApplicationId() );
-//
-//        // ----------------- test that we can read them, should fail
-//
-//        logger.debug("Reading data, should fail this time ");
-//        try {
-//            readData( em,  "testTypes", entityCount, 0 );
-//            fail("should have failed to read data");
-//
-//        } catch (Exception expected) {}
+        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
+        deleteIndex( em.getApplicationId() );
+
+        // ----------------- test that we can read them, should fail
 
-        // ----------------- rebuild index for catherders only
+        logger.debug("Reading data, should fail this time ");
+        try {
+            readData( em,  "testTypes", entityCount, 0 );
+            fail("should have failed to read data");
+
+        } catch (Exception expected) {}
+
+//        ----------------- rebuild index for catherders only
 
         logger.debug("Preparing to rebuild all indexes");;
 
@@ -313,20 +313,20 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         logger.debug("Deleting app index and system app index");
 
-//        deleteIndex( em.getApplicationId() );
+        deleteIndex( em.getApplicationId() );
 //
 //        // deleting sytem app index will interfere with other concurrently running tests
-//        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
+        //deleteIndex( CpNamingUtils.SYSTEM_APP_ID );
 //
 //
 //        // ----------------- test that we can read them, should fail
 //
-//        logger.debug("Reading data, should fail this time ");
-//        try {
-//            readData( em, "testTypes", entityCount, 3 );
-//            fail("should have failed to read data");
-//
-//        } catch (Exception expected) {}
+        logger.debug("Reading data, should fail this time ");
+        try {
+            readData( em, "testTypes", entityCount, 3 );
+            fail("should have failed to read data");
+
+        } catch (Exception expected) {}
 
         // ----------------- rebuild index
 
@@ -371,23 +371,26 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them
 
+        Thread.sleep(2000);
         readData( em, "testTypes", entityCount, 3 );
     }
 
-//    /**
-//     * Delete index for all applications, just need the one to get started.
-//     */
-//    private void deleteIndex( UUID appUuid ) {
-//
-//        Injector injector = SpringResource.getInstance().getBean( Injector.class );
-//        EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
-//
-//        Id appId = new SimpleId( appUuid, "application");
-//        ApplicationScope scope = new ApplicationScopeImpl( appId );
-//        ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
-//        EsEntityIndexImpl eeii = (EsEntityIndexImpl)ei;
-//
-//    }
+    /**
+     * Delete index for all applications, just need the one to get started.
+     */
+    private void deleteIndex( UUID appUuid ) {
+
+        Injector injector = SpringResource.getInstance().getBean( Injector.class );
+        EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
+
+        Id appId = new SimpleId( appUuid, "application");
+        ApplicationScope scope = new ApplicationScopeImpl( appId );
+        ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
+
+        ei.deleteApplication().toBlocking().lastOrDefault(null);
+        app.refreshIndex();
+
+    }
 
 
     private int readData( EntityManager em,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index fab32b3..34967bd 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -19,8 +19,10 @@
  */
 package org.apache.usergrid.persistence.index;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.elasticsearch.action.ListenableActionFuture;
 import rx.Observable;
 
 /**
@@ -39,4 +41,10 @@ public interface ApplicationEntityIndex {
      * Execute query in Usergrid syntax.
      */
     public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, Query query );
+
+    /**
+     * delete all application records
+     * @return
+     */
+    public Observable deleteApplication();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index c83fe41..3633c5b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.persistence.index.impl;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang3.ArrayUtils;
@@ -62,9 +64,7 @@ import rx.Observable;
 import rx.functions.Action1;
 import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -89,6 +89,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
     private final IndexFig indexFig;
     private final EsProvider esProvider;
     private final IndexIdentifier.IndexAlias alias;
+    private final Timer deleteApplicationTimer;
+    private final Meter deleteApplicationMeter;
     private FailureMonitor failureMonitor;
     private final int cursorTimeout;
     @Inject
@@ -109,11 +111,16 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
 
         mapManager = mapManagerFactory.createMapManager(mapScope);
         this.searchTimer = metricsFactory
-            .getTimer(EsEntityIndexImpl.class, "search.timer");
+            .getTimer(EsApplicationEntityIndexImpl.class, "search.timer");
         this.cursorTimer = metricsFactory
-            .getTimer(EsEntityIndexImpl.class, "search.cursor.timer");
+            .getTimer(EsApplicationEntityIndexImpl.class, "search.cursor.timer");
         this.cursorTimeout = config.getQueryCursorTimeout();
 
+        this.deleteApplicationTimer = metricsFactory
+            .getTimer(EsApplicationEntityIndexImpl.class, "delete.application");
+        this.deleteApplicationMeter = metricsFactory
+            .getMeter(EsApplicationEntityIndexImpl.class, "delete.application.meter");
+
         this.alias = indexIdentifier.getAlias();
 
     }
@@ -256,6 +263,69 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
         return parseResults(searchResponse, query);
     }
 
+    /**
+     * Completely delete an index.
+     */
+    public Observable deleteApplication() {
+        deleteApplicationMeter.mark();
+        String idString = IndexingUtils.idString(applicationScope.getApplication());
+        final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
+        Set<String> indexSet = new HashSet<>();
+        List<String> reads =  Arrays.asList(entityIndex.getIndexes(AliasedEntityIndex.AliasType.Read));
+        List<String> writes = Arrays.asList(entityIndex.getIndexes(AliasedEntityIndex.AliasType.Write));
+        indexSet.addAll(reads);
+        indexSet.addAll(writes);
+        String[] indexes = indexSet.toArray(new String[0]);
+        Timer.Context timer = deleteApplicationTimer.time();
+        //Added For Graphite Metrics
+        return Observable.from(indexes)
+            .flatMap(index -> {
+
+                final ListenableActionFuture<DeleteByQueryResponse> response = esProvider.getClient()
+                    .prepareDeleteByQuery(alias.getWriteAlias()).setQuery(tqb).execute();
+
+                response.addListener(new ActionListener<DeleteByQueryResponse>() {
+
+                    @Override
+                    public void onResponse(DeleteByQueryResponse response) {
+                        checkDeleteByQueryResponse(tqb, response);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable e) {
+                        logger.error("failed on delete index", e);
+                    }
+                });
+                return Observable.from(response);
+            })
+            .doOnCompleted(() -> timer.stop());
+    }
+
+    /**
+     * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
+     */
+    private void checkDeleteByQueryResponse(
+        final QueryBuilder query, final DeleteByQueryResponse response ) {
+
+        for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
+            final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
+
+            for ( ShardOperationFailedException failedException : failures ) {
+                logger.error( String.format("Unable to delete by query %s. "
+                            + "Failed with code %d and reason %s on shard %s in index %s",
+                        query.toString(),
+                        failedException.status().getStatus(),
+                        failedException.reason(),
+                        failedException.shardId(),
+                        failedException.index() )
+                );
+            }
+
+        }
+    }
+
+
+
 
     private CandidateResults parseResults( final SearchResponse searchResponse, final Query query ) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 38bf381..7b0c3b5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -95,9 +95,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         ValidationUtils.verifyEntityWrite( entity );
         ValidationUtils.verifyVersion( entity.getVersion() );
         //add app id for indexing
-        entity.setField(
-            new StringField(APPLICATION_ID_FIELDNAME, IndexingUtils.idString(applicationScope.getApplication()))
-        );
+
         final String context = createContextName(applicationScope,indexScope);
 
         if ( log.isDebugEnabled() ) {
@@ -110,7 +108,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         ValidationUtils.verifyEntityWrite( entity );
 
         Map<String, Object> entityAsMap = entityToMap( entity, context );
-
+        //add app id
+        entityAsMap.put(APPLICATION_ID_FIELDNAME, idString(applicationScope.getApplication()));
         // need prefix here because we index UUIDs as strings
 
         // let caller add these fields if needed

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf80b8bb/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8bdd663..7029bba 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.*;
 
 
 /**
@@ -338,10 +339,13 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
             @Override
             public boolean doOp() {
                 try {
-                    String[] indexes = ArrayUtils.addAll(
-                        getIndexes(AliasType.Read),
-                        getIndexes(AliasType.Write)
-                    );
+
+                    Set<String> indexSet = new HashSet<>();
+                    List<String> reads =  Arrays.asList(getIndexes(AliasType.Read));
+                    List<String> writes = Arrays.asList(getIndexes(AliasType.Write));
+                    indexSet.addAll(reads);
+                    indexSet.addAll(writes);
+                    String[] indexes = indexSet.toArray(new String[0]);
 
                     if ( indexes.length == 0 ) {
                         logger.debug( "Not refreshing indexes. none found");