You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/09 21:28:09 UTC
git commit: Only init ElasticSearch mappings once on startup,
also: some tweaks to the stale index test.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o c0edbe191 -> 3fdc74f52
Only init ElasticSearch mappings once on startup, also: some tweaks to the stale index test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3fdc74f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3fdc74f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3fdc74f5
Branch: refs/heads/two-dot-o
Commit: 3fdc74f52294b890310020f2fcc77e183dbf7cd2
Parents: c0edbe1
Author: Dave Johnson <dm...@apigee.com>
Authored: Thu Oct 9 15:27:52 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Thu Oct 9 15:27:52 2014 -0400
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 3 -
.../corepersistence/StaleIndexCleanupTest.java | 51 +++++++++-----
.../index/impl/EsEntityIndexBatchImpl.java | 16 ++---
.../index/impl/EsEntityIndexImpl.java | 73 +++++++++++---------
4 files changed, 79 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 1a8f17c..0cf89b8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -494,9 +494,6 @@ public class CpEntityManager implements EntityManager {
cpEntity, entity.getProperties(), entity.getType(), true );
try {
- logger.debug("About to Write {}:{} version {}", new Object[] {
- cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
-
cpEntity = ecm.update( cpEntity ).toBlockingObservable().last();
cpEntity = ecm.load( entityId ).toBlockingObservable().last();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index c5d5782..eab82b1 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -54,10 +54,14 @@ import org.slf4j.LoggerFactory;
public class StaleIndexCleanupTest extends AbstractCoreIT {
private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class );
- private static final long writeDelayMs = 80;
- //private static final long readDelayMs = 7;
+ // take it easy on embedded Cassandra
+ private static final long writeDelayMs = 50;
+ private static final long readDelayMs = 50;
+ /**
+ * Test that updating an entity causes the entity's version number to change.
+ */
@Test
public void testUpdateVersioning() throws Exception {
@@ -82,15 +86,17 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
assertEquals( "widget", cpUpdated.getField("stuff").getValue());
UUID newVersion = cpUpdated.getVersion();
- // this assertion fails
assertTrue( "New version is greater than old",
UUIDComparator.staticCompare( newVersion, oldVersion ) > 0 );
- // this fails too
assertEquals( 2, queryCollectionCp("things", "select *").size() );
}
+ /**
+ * Test that the CpRelationManager cleans up and stale indexes that it finds when it is
+ * building search results.
+ */
@Test
public void testStaleIndexCleanup() throws Exception {
@@ -100,8 +106,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
final EntityManager em = app.getEntityManager();
- int numEntities = 100;
- int numUpdates = 10;
+ final int numEntities = 100;
+ final int numUpdates = 5;
// create lots of entities
final List<Entity> things = new ArrayList<Entity>();
@@ -115,7 +121,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
em.refreshIndex();
CandidateResults crs = queryCollectionCp( "things", "select *");
- Assert.assertEquals( numEntities, crs.size() );
+ Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
// update each one a bunch of times
int count = 0;
@@ -128,34 +134,43 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
em.update(toUpdate);
Thread.sleep( writeDelayMs );
- em.refreshIndex();
- count++;
+ count++;
if ( count % 100 == 0 ) {
logger.info("Updated {} of {} times", count, numEntities * numUpdates);
}
}
}
+ em.refreshIndex();
// query Core Persistence directly for total number of result candidates
- // should be entities X updates because of stale indexes
crs = queryCollectionCp("things", "select *");
- Assert.assertEquals( numEntities * numUpdates, crs.size() );
+ Assert.assertEquals( "Expect stale candidates", numEntities * (numUpdates + 1), crs.size());
- // query EntityManager for results
- // should return 100 becuase it filters out the stale entities
+ // query EntityManager for results and page through them
+ // should return numEntities becuase it filters out the stale entities
Query q = Query.fromQL("select *");
- q.setLimit( 10000 );
- Results results = em.searchCollection( em.getApplicationRef(), "things", q);
- assertEquals( numEntities, results.size() );
+ q.setLimit( 8 );
+ int thingCount = 0;
+ String cursor = null;
+ do {
+ Results results = em.searchCollection( em.getApplicationRef(), "things", q);
+ cursor = results.getCursor();
+ if ( cursor != null ) {
+ assertEquals( 8, results.size() );
+ }
+ thingCount += results.size();
+
+ } while ( cursor != null );
+ assertEquals( "Expect no stale candidates", numEntities, thingCount );
// EntityManager should have kicked off a batch cleanup of those stale indexes
// wait a second for batch cleanup to complete
Thread.sleep(600);
- // query for total number of result candidates = 100
+ // query for total number of result candidates = numEntities
crs = queryCollectionCp("things", "select *");
- Assert.assertEquals( numEntities, crs.size() );
+ Assert.assertEquals( "Expect stale candidates de-indexed", numEntities, crs.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 9e70a00..56fb7b7 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -58,7 +58,6 @@ import org.apache.usergrid.persistence.model.field.UUIDField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
import com.google.common.base.Joiner;
-import java.io.IOException;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
@@ -69,10 +68,6 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PR
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createCollectionScopeTypeName;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexName;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.indices.IndexAlreadyExistsException;
public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -83,10 +78,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final Client client;
- // Keep track of what types we have already initialized to avoid cost
- // of attempting to init them again. Used in the initType() method.
- private final Set<String> knownTypes;
-
private final boolean refresh;
private final String indexName;
@@ -99,12 +90,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
public EsEntityIndexBatchImpl(
- final ApplicationScope applicationScope, final Client client, final IndexFig config,
- final Set<String> knownTypes, final int autoFlushSize ) {
+ final ApplicationScope applicationScope,
+ final Client client,
+ final IndexFig config,
+ final int autoFlushSize ) {
this.applicationScope = applicationScope;
this.client = client;
- this.knownTypes = knownTypes;
this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
this.refresh = config.isForcedRefresh();
this.autoFlushSize = autoFlushSize;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3fdc74f5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8c07d6d..8e53e5c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -19,8 +19,6 @@ package org.apache.usergrid.persistence.index.impl;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.UUID;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -57,6 +55,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.DOC_ID_SEPARATOR_SPLITTER;
@@ -74,23 +73,23 @@ public class EsEntityIndexImpl implements EntityIndex {
private static final Logger log = LoggerFactory.getLogger(EsEntityIndexImpl.class);
+ private static final AtomicBoolean mappingsCreated = new AtomicBoolean(false);
+
private final String indexName;
private final ApplicationScope applicationScope;
private final Client client;
- // Keep track of what types we have already initialized to avoid cost
- // of attempting to init them again. Used in the initType() method.
- private Set<String> knownTypes = new TreeSet<String>();
-
private final int cursorTimeout;
private final IndexFig config;
@Inject
- public EsEntityIndexImpl(@Assisted final ApplicationScope appScope,
- final IndexFig config, final EsProvider provider) {
+ public EsEntityIndexImpl(
+ @Assisted final ApplicationScope appScope,
+ final IndexFig config,
+ final EsProvider provider) {
ValidationUtils.validateApplicationScope(appScope);
@@ -101,47 +100,59 @@ public class EsEntityIndexImpl implements EntityIndex {
this.cursorTimeout = config.getQueryCursorTimeout();
this.indexName = IndexingUtils.createIndexName(config.getIndexPrefix(), appScope);
- } catch (Exception e) {
- log.error("Error setting up index", e);
- throw e;
+ initIndex();
+
+ } catch ( IOException ex ) {
+ throw new RuntimeException("Error initializing ElasticSearch mappings or index", ex);
}
+ }
- AdminClient admin = client.admin();
- try {
- CreateIndexResponse cir = admin.indices().prepareCreate(indexName).execute().actionGet();
- log.debug("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
- client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+ private void initIndex() throws IOException {
- XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
- XContentFactory.jsonBuilder(), "_default_");
+ try {
+ if ( !mappingsCreated.getAndSet(true) ) {
+ createMappings();
+ }
- PutIndexTemplateResponse pitr = client.admin().indices()
- .preparePutTemplate("usergrid_template")
- .setTemplate(config.getIndexPrefix() + "*")
- .addMapping("_default_", xcb)
- .execute()
- .actionGet();
+ AdminClient admin = client.admin();
+ CreateIndexResponse cir = admin.indices().prepareCreate(indexName).execute().actionGet();
+ log.debug("Created new Index Name [{}] ACK=[{}]", indexName, cir.isAcknowledged());
- log.debug("Create Mapping for new Index Name [{}] ACK=[{}]",
- indexName, pitr.isAcknowledged());
+ admin.indices().prepareRefresh(indexName).execute().actionGet();
try {
// TODO: figure out what refresh above is not enough to ensure index is ready
Thread.sleep(500);
} catch (InterruptedException ex) {}
- } catch (IndexAlreadyExistsException expected) {
+ } catch ( IndexAlreadyExistsException expected ) {
// this is expected to happen if index already exists
-
- } catch ( IOException ioe ) {
- throw new RuntimeException("Error setting up index", ioe);
}
}
+
+ /**
+ * Setup ElasticSearch type mappings as a template that applies to all new indexes.
+ * Applies to all indexes that start with our prefix.
+ */
+ private void createMappings() throws IOException {
+
+ XContentBuilder xcb = IndexingUtils.createDoubleStringIndexMapping(
+ XContentFactory.jsonBuilder(), "_default_");
+
+ PutIndexTemplateResponse pitr = client.admin().indices()
+ .preparePutTemplate("usergrid_template")
+ .setTemplate(config.getIndexPrefix() + "*")
+ .addMapping("_default_", xcb) // set mapping as the default for all types
+ .execute()
+ .actionGet();
+ }
+
+
@Override
public EntityIndexBatch createBatch() {
- return new EsEntityIndexBatchImpl(applicationScope, client, config, knownTypes, 1000);
+ return new EsEntityIndexBatchImpl(applicationScope, client, config, 1000);
}
@Override