You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/09 21:28:09 UTC

git commit: Only init ElasticSearch mappings once on startup, also: some tweaks to the stale index test.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o c0edbe191 -> 3fdc74f52


Only init ElasticSearch mappings once on startup, also: some tweaks to the stale index test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3fdc74f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3fdc74f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3fdc74f5

Branch: refs/heads/two-dot-o
Commit: 3fdc74f52294b890310020f2fcc77e183dbf7cd2
Parents: c0edbe1
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 15:27:52 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 15:27:52 2014 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  3 -
 .../corepersistence/StaleIndexCleanupTest.java  | 51 +++++++++-----
 .../index/impl/EsEntityIndexBatchImpl.java      | 16 ++---
 .../index/impl/EsEntityIndexImpl.java           | 73 +++++++++++---------
 4 files changed, 79 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1a8f17c..0cf89b8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -494,9 +494,6 @@ public class CpEntityManager implements EntityManager {
                 cpEntity, entity.getProperties(), entity.getType(), true );
 
         try {
-            logger.debug("About to Write {}:{} version {}", new Object[] { 
-                cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
-
             cpEntity = ecm.update( cpEntity ).toBlockingObservable().last();
             cpEntity = ecm.load( entityId ).toBlockingObservable().last();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index c5d5782..eab82b1 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -54,10 +54,14 @@ import org.slf4j.LoggerFactory;
 public class StaleIndexCleanupTest extends AbstractCoreIT {
     private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class );
 
-    private static final long writeDelayMs = 80;
-    //private static final long readDelayMs = 7;
+    // take it easy on embedded Cassandra
+    private static final long writeDelayMs = 50;
+    private static final long readDelayMs = 50;
 
 
+    /**
+     * Test that updating an entity causes the entity's version number to change.
+     */
     @Test
     public void testUpdateVersioning() throws Exception {
 
@@ -82,15 +86,17 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         assertEquals( "widget", cpUpdated.getField("stuff").getValue());
         UUID newVersion = cpUpdated.getVersion();
 
-        // this assertion fails
         assertTrue( "New version is greater than old", 
                 UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
 
-        // this fails too 
         assertEquals( 2, queryCollectionCp("things", "select *").size() );
     }
 
 
+    /**
+     * Test that the CpRelationManager cleans up and stale indexes that it finds when it is 
+     * building search results.
+     */
     @Test
     public void testStaleIndexCleanup() throws Exception {
 
@@ -100,8 +106,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         final EntityManager em = app.getEntityManager();
 
-        int numEntities = 100;
-        int numUpdates = 10;
+        final int numEntities = 100;
+        final int numUpdates = 5;
 
         // create lots of entities
         final List<Entity> things = new ArrayList<Entity>();
@@ -115,7 +121,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
         em.refreshIndex();
 
         CandidateResults crs = queryCollectionCp( "things", "select *");
-        Assert.assertEquals( numEntities, crs.size() );
+        Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
 
         // update each one a bunch of times
         int count = 0;
@@ -128,34 +134,43 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
                 em.update(toUpdate);
 
                 Thread.sleep( writeDelayMs );
-                em.refreshIndex();
-                count++;
 
+                count++;
                 if ( count % 100 == 0 ) {
                     logger.info("Updated {} of {} times", count, numEntities * numUpdates);
                 }
             }
         }
+        em.refreshIndex();
 
         // query Core Persistence directly for total number of result candidates
-        // should be entities X updates because of stale indexes 
         crs = queryCollectionCp("things", "select *");
-        Assert.assertEquals( numEntities * numUpdates, crs.size() );
+        Assert.assertEquals( "Expect stale candidates", numEntities * (numUpdates + 1), crs.size());
 
-        // query EntityManager for results
-        // should return 100 becuase it filters out the stale entities
+        // query EntityManager for results and page through them
+        // should return numEntities becuase it filters out the stale entities
         Query q = Query.fromQL("select *");
-        q.setLimit( 10000 );
-        Results results = em.searchCollection( em.getApplicationRef(), "things", q);
-        assertEquals( numEntities, results.size() );
+        q.setLimit( 8 ); 
+        int thingCount = 0;
+        String cursor = null;
+        do {
+            Results results = em.searchCollection( em.getApplicationRef(), "things", q);
+            cursor = results.getCursor();
+            if ( cursor != null ) {
+                assertEquals( 8, results.size() );
+            }
+            thingCount += results.size();
+
+        } while ( cursor != null );
+        assertEquals( "Expect no stale candidates", numEntities, thingCount );
 
         // EntityManager should have kicked off a batch cleanup of those stale indexes
         // wait a second for batch cleanup to complete
         Thread.sleep(600);
 
-        // query for total number of result candidates = 100
+        // query for total number of result candidates = numEntities
         crs = queryCollectionCp("things", "select *");
-        Assert.assertEquals( numEntities, crs.size() );
+        Assert.assertEquals( "Expect stale candidates de-indexed", numEntities, crs.size());
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 9e70a00..56fb7b7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -58,7 +58,6 @@ import org.apache.usergrid.persistence.model.field.UUIDField;
 import org.apache.usergrid.persistence.model.field.value.EntityObject;
 
 import com.google.common.base.Joiner;
-import java.io.IOException;
 
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
@@ -69,10 +68,6 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PR
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createCollectionScopeTypeName;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexName;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.indices.IndexAlreadyExistsException;
 
 
 public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -83,10 +78,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private final Client client;
 
-    // Keep track of what types we have already initialized to avoid cost
-    // of attempting to init them again. Used in the initType() method.
-    private final Set<String> knownTypes;
-
     private final boolean refresh;
 
     private final String indexName;
@@ -99,12 +90,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
 
     public EsEntityIndexBatchImpl( 
-            final ApplicationScope applicationScope, final Client client, final IndexFig config,
-            final Set<String> knownTypes, final int autoFlushSize ) {
+            final ApplicationScope applicationScope, 
+            final Client client, 
+            final IndexFig config,
+            final int autoFlushSize ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
-        this.knownTypes = knownTypes;
         this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
         this.refresh = config.isForcedRefresh();
         this.autoFlushSize = autoFlushSize;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8c07d6d..8e53e5c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -19,8 +19,6 @@ package org.apache.usergrid.persistence.index.impl;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.UUID;
 
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -57,6 +55,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
 import static org.apache.usergrid.persistence.index.impl.IndexingUtils.DOC_ID_SEPARATOR_SPLITTER;
@@ -74,23 +73,23 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     private static final Logger log = LoggerFactory.getLogger(EsEntityIndexImpl.class);
 
+    private static final AtomicBoolean mappingsCreated = new AtomicBoolean(false);
+
     private final String indexName;
 
     private final ApplicationScope applicationScope;
 
     private final Client client;
 
-    // Keep track of what types we have already initialized to avoid cost 
-    // of attempting to init them again. Used in the initType() method.
-    private Set<String> knownTypes = new TreeSet<String>();
-
     private final int cursorTimeout;
 
     private final IndexFig config;
 
     @Inject
-    public EsEntityIndexImpl(@Assisted final ApplicationScope appScope, 
-            final IndexFig config, final EsProvider provider) {
+    public EsEntityIndexImpl(
+            @Assisted final ApplicationScope appScope, 
+            final IndexFig config, 
+            final EsProvider provider) {
 
         ValidationUtils.validateApplicationScope(appScope);
 
@@ -101,47 +100,59 @@ public class EsEntityIndexImpl implements EntityIndex {
             this.cursorTimeout = config.getQueryCursorTimeout();
             this.indexName = IndexingUtils.createIndexName(config.getIndexPrefix(), appScope);
 
-        } catch (Exception e) {
-            log.error("Error setting up index", e);
-            throw e;
+            initIndex();
+
+        } catch ( IOException ex ) {
+            throw new RuntimeException("Error initializing ElasticSearch mappings or index", ex);
         }
+    }
 
-        AdminClient admin = client.admin();
-        try {
-            CreateIndexResponse cir = admin.indices().prepareCreate(indexName).execute().actionGet();
-            log.debug("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
 
-            client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+    private void initIndex() throws IOException {
 
-            XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping( 
-                XContentFactory.jsonBuilder(), "_default_");
+        try {
+            if ( !mappingsCreated.getAndSet(true) ) {
+                createMappings();
+            }
 
-            PutIndexTemplateResponse pitr = client.admin().indices()
-                .preparePutTemplate("usergrid_template")
-                .setTemplate(config.getIndexPrefix() + "*")
-                .addMapping("_default_", xcb)
-                .execute()
-                .actionGet();
+            AdminClient admin = client.admin();
+            CreateIndexResponse cir = admin.indices().prepareCreate(indexName).execute().actionGet();
+            log.debug("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
 
-            log.debug("Create Mapping for new Index Name [{}] ACK=[{}]", 
-                    indexName, pitr.isAcknowledged());
+            admin.indices().prepareRefresh(indexName).execute().actionGet();
 
             try {
                 // TODO: figure out what refresh above is not enough to ensure index is ready
                 Thread.sleep(500);
             } catch (InterruptedException ex) {}
 
-        } catch (IndexAlreadyExistsException expected) {
+        } catch ( IndexAlreadyExistsException expected ) {
             // this is expected to happen if index already exists
-
-        } catch ( IOException ioe ) {
-            throw new RuntimeException("Error setting up index", ioe);
         }
     }
 
+
+    /**
+     * Setup ElasticSearch type mappings as a template that applies to all new indexes.
+     * Applies to all indexes that start with our prefix.
+     */
+    private void createMappings() throws IOException {
+
+        XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
+            XContentFactory.jsonBuilder(), "_default_");
+
+        PutIndexTemplateResponse pitr = client.admin().indices()
+            .preparePutTemplate("usergrid_template")
+            .setTemplate(config.getIndexPrefix() + "*") 
+            .addMapping("_default_", xcb) // set mapping as the default for all types
+            .execute()
+            .actionGet();
+    }
+
+
     @Override
     public EntityIndexBatch createBatch() {
-        return new EsEntityIndexBatchImpl(applicationScope, client, config, knownTypes, 1000);
+        return new EsEntityIndexBatchImpl(applicationScope, client, config, 1000);
     }
 
     @Override