You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/12/16 16:00:12 UTC
[42/50] incubator-usergrid git commit: Fixes to entity created,
version created and delete event handlers.
Fixes to entity created, version created and delete event handlers.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7f1d533b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7f1d533b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7f1d533b
Branch: refs/heads/no-source-in-es
Commit: 7f1d533b6ced46307ae76c304fc0efc053e746ca
Parents: bf47eb3
Author: Dave Johnson <dm...@apigee.com>
Authored: Wed Dec 10 09:27:06 2014 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Wed Dec 10 09:27:06 2014 -0500
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 52 ++------------------
.../corepersistence/CpRelationManager.java | 33 ++++++++-----
.../usergrid/corepersistence/GuiceModule.java | 7 ++-
.../events/EntityDeletedHandler.java | 22 ++++++---
.../events/EntityVersionCreatedHandler.java | 34 +++++++------
.../events/EntityVersionDeletedHandler.java | 30 ++++++++---
.../results/FilteringLoader.java | 7 ++-
.../corepersistence/StaleIndexCleanupTest.java | 43 +++++++++++-----
.../usergrid/persistence/CollectionIT.java | 15 ++++--
.../index/impl/EsEntityIndexBatchImpl.java | 6 ++-
.../index/impl/EsEntityIndexImpl.java | 25 +++++-----
11 files changed, 149 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index e1b0a73..3ab767c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -607,50 +607,6 @@ public class CpEntityManager implements EntityManager {
if ( entity != null ) {
- // first, delete entity in every collection and connection scope in which it is indexed
-
-// RelationManager rm = getRelationManager( entityRef );
-// Map<String, Map<UUID, Set<String>>> owners = rm.getOwners();
-//
-// logger.debug( "Deleting indexes of all {} collections owning the entity",
-// owners.keySet().size() );
-//
-// final EntityIndex ei = managerCache.getEntityIndex(getApplicationScope());
-//
-// final EntityIndexBatch batch = ei.createBatch();
-//
-// for ( String ownerType : owners.keySet() ) {
-// Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType );
-//
-// for ( UUID uuid : collectionsByUuid.keySet() ) {
-// Set<String> collectionNames = collectionsByUuid.get( uuid );
-// for ( String coll : collectionNames ) {
-//
-// IndexScope indexScope = new IndexScopeImpl(
-// new SimpleId( uuid, ownerType ),
-// CpNamingUtils.getCollectionScopeNameFromCollectionName( coll ) );
-//
-//
-// batch.index( indexScope, entity );
-// }
-// }
-// }
-//
-// // deindex from default index scope
-// IndexScope defaultIndexScope = new IndexScopeImpl(
-// getApplicationScope().getApplication(),
-// CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-//
-// batch.deindex(defaultIndexScope, entity );
-//
-// IndexScope allTypesIndexScope = new IndexScopeImpl(
-// getApplicationScope().getApplication(),
-// CpNamingUtils.ALL_TYPES);
-//
-// batch.deindex( allTypesIndexScope, entity );
-//
-// batch.execute();
-
decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) );
// and finally...
@@ -2010,10 +1966,10 @@ public class CpEntityManager implements EntityManager {
for ( Query.CounterFilterPredicate filter : filters ) {
CounterUtils.AggregateCounterSelection selection =
- new CounterUtils.AggregateCounterSelection( filter.getName(),
- getUuid( getUserByIdentifier( filter.getUser() ) ),
- getUuid( getGroupByIdentifier( filter.getGroup() ) ),
- org.apache.usergrid.mq.Queue.getQueueId( filter.getQueue() ), filter.getCategory() );
+ new CounterUtils.AggregateCounterSelection( filter.getName(),
+ getUuid( getUserByIdentifier( filter.getUser() ) ),
+ getUuid( getGroupByIdentifier( filter.getGroup() ) ),
+ org.apache.usergrid.mq.Queue.getQueueId( filter.getQueue() ), filter.getCategory() );
selections.put( selection.getRow( resolution ), selection );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index dcb4ba1..135688e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -220,7 +220,8 @@ public class CpRelationManager implements RelationManager {
this.indexBucketLocator = indexBucketLocator; // TODO: this also
// load the Core Persistence version of the head entity as well
- this.headEntityScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), headEntity.getType());
+ this.headEntityScope = getCollectionScopeNameFromEntityType(
+ applicationScope.getApplication(), headEntity.getType());
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading head entity {}:{} from scope\n app {}\n owner {}\n name {}",
@@ -410,16 +411,16 @@ public class CpRelationManager implements RelationManager {
if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
String collName = CpNamingUtils.getCollectionName( edge.getType() );
- indexScope =
- new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+ indexScope = new IndexScopeImpl(
+ new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
}
else {
String connName = CpNamingUtils.getConnectionType( edge.getType() );
- indexScope =
- new IndexScopeImpl( new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
- CpNamingUtils.getConnectionScopeName( connName ) );
+ indexScope = new IndexScopeImpl(
+ new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
+ CpNamingUtils.getConnectionScopeName( connName ) );
}
entityIndexBatch.index( indexScope, cpEntity );
@@ -604,7 +605,8 @@ public class CpRelationManager implements RelationManager {
public Entity addToCollection( String collName, EntityRef itemRef, boolean connectBack )
throws Exception {
- CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
+ CollectionScope memberScope = getCollectionScopeNameFromEntityType(
+ applicationScope.getApplication(), itemRef.getType());
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity memberEntity =
@@ -637,7 +639,8 @@ public class CpRelationManager implements RelationManager {
}
// load the new member entity to be added to the collection from its default scope
- CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
+ CollectionScope memberScope = getCollectionScopeNameFromEntityType(
+ applicationScope.getApplication(), itemRef.getType());
//TODO, this double load should disappear once events are in
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
@@ -789,7 +792,8 @@ public class CpRelationManager implements RelationManager {
}
// load the entity to be removed to the collection
- CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
+ CollectionScope memberScope = getCollectionScopeNameFromEntityType(
+ applicationScope.getApplication(), itemRef.getType());
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading entity to remove from collection "
@@ -1005,7 +1009,8 @@ public class CpRelationManager implements RelationManager {
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
- CollectionScope targetScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), connectedEntityRef.getType());
+ CollectionScope targetScope = getCollectionScopeNameFromEntityType(
+ applicationScope.getApplication(), connectedEntityRef.getType());
if ( logger.isDebugEnabled() ) {
logger.debug("createConnection(): "
@@ -1333,7 +1338,8 @@ public class CpRelationManager implements RelationManager {
headEntity = em.validate( headEntity );
- IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(), CpNamingUtils.getConnectionScopeName( connectionType ) );
+ IndexScope indexScope = new IndexScopeImpl(
+ cpHeadEntity.getId(), CpNamingUtils.getConnectionScopeName( connectionType ) );
final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType );
@@ -1564,7 +1570,8 @@ public class CpRelationManager implements RelationManager {
* @param crs Candidates to be considered for results
* @param collName Name of collection or null if querying all types
*/
- private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs, final String collName ) {
+ private Results buildResults( final IndexScope indexScope, final Query query,
+ final CandidateResults crs, final String collName ) {
logger.debug( "buildResults() for {} from {} candidates", collName, crs.size() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 2ff6c4f..ed6cba2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -48,6 +48,8 @@ public class GuiceModule extends AbstractModule {
private EntityManagerFactory emf;
+ public static final String EVENTS_DISABLED = "corepersistence.events.disabled";
+
GuiceModule( EntityManagerFactory emf ) {
this.emf = emf;
}
@@ -68,10 +70,11 @@ public class GuiceModule extends AbstractModule {
bind(ManagerCache.class).to( CpManagerCache.class );
- Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+ Multibinder<DataMigration> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), DataMigration.class );
dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
- //dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
+ dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
Multibinder<EntityDeleted> entityBinder =
Multibinder.newSetBinder(binder(), EntityDeleted.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 865e284..7d2ab2d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -24,6 +24,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import java.util.UUID;
import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
+import static org.apache.usergrid.corepersistence.GuiceModule.EVENTS_DISABLED;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.slf4j.Logger;
@@ -39,20 +40,27 @@ public class EntityDeletedHandler implements EntityDeleted {
@Inject
EntityManagerFactory emf;
- public EntityDeletedHandler() {
- logger.debug("Created");
- }
@Override
public void deleted(CollectionScope scope, Id entityId, UUID version) {
- logger.debug("Entering deleted for entity {}:{} v {} "
+ // This check is for testing purposes and for a test that to be able to dynamically turn
+ // off and on delete previous versions so that it can test clean-up on read.
+ if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+ return;
+ }
+
+ logger.debug("Handling deleted event for entity {}:{} v {} "
+ "scope\n name: {}\n owner: {}\n app: {}",
- new Object[] { entityId.getType(), entityId.getUuid(), version,
- scope.getName(), scope.getOwner(), scope.getApplication()});
+ new Object[] {
+ entityId.getType(),
+ entityId.getUuid(),
+ version,
+ scope.getName(),
+ scope.getOwner(),
+ scope.getApplication()});
CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
-
final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
ei.deleteAllVersionsOfEntity( entityId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index 68c5547..a797fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
+import static org.apache.usergrid.corepersistence.GuiceModule.EVENTS_DISABLED;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
@@ -41,27 +42,28 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
EntityManagerFactory emf;
- public EntityVersionCreatedHandler() {
- logger.debug("EntityVersionCreated");
- }
-
-
@Override
public void versionCreated( final CollectionScope scope, final Entity entity ) {
- logger.debug("Entering deleted for entity {}:{} v {} "
- + "scope\n name: {}\n owner: {}\n app: {}",
- new Object[] { entity.getId().getType(), entity.getId().getUuid(), entity.getVersion(),
- scope.getName(), scope.getOwner(), scope.getApplication()});
-
- CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
-
- final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
// This check is for testing purposes and for a test that to be able to dynamically turn
// off and on delete previous versions so that it can test clean-up on read.
- if ( System.getProperty( "allow.stale.entities", "false" ).equals( "false" )) {
-
- ei.deletePreviousVersions( entity.getId(), entity.getVersion() );
+ if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+ return;
}
+
+ logger.debug("Handling versionCreated for entity {}:{} v {} "
+ + "scope\n name: {}\n owner: {}\n app: {}",
+ new Object[] {
+ entity.getId().getType(),
+ entity.getId().getUuid(),
+ entity.getVersion(),
+ scope.getName(),
+ scope.getOwner(),
+ scope.getApplication()});
+
+ CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
+ final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
+
+ ei.deletePreviousVersions( entity.getId(), entity.getVersion() );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 23097c4..bfffc26 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -19,8 +19,8 @@ package org.apache.usergrid.corepersistence.events;
import com.google.inject.Inject;
import java.util.List;
-import java.util.UUID;
import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
+import static org.apache.usergrid.corepersistence.GuiceModule.EVENTS_DISABLED;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.MvccEntity;
@@ -32,6 +32,8 @@ import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -42,20 +44,35 @@ import rx.schedulers.Schedulers;
* If we do need it then it should be wired in via GuiceModule in the corepersistence package.
*/
public class EntityVersionDeletedHandler implements EntityVersionDeleted {
+ private static final Logger logger = LoggerFactory.getLogger(EntityVersionDeletedHandler.class );
- private final SerializationFig serializationFig;
+ @Inject
+ private SerializationFig serializationFig;
+ @Inject
private EntityManagerFactory emf;
- @Inject
- public EntityVersionDeletedHandler(SerializationFig fig, EntityManagerFactory emf) {
- this.serializationFig = fig;
- }
@Override
public void versionDeleted(
final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
+ // This check is for testing purposes and for a test that to be able to dynamically turn
+ // off and on delete previous versions so that it can test clean-up on read.
+ if ( System.getProperty( EVENTS_DISABLED, "false" ).equals( "true" )) {
+ return;
+ }
+
+ logger.debug("Handling versionDeleted count={} event for entity {}:{} v {} "
+ + "scope\n name: {}\n owner: {}\n app: {}",
+ new Object[] {
+ entityVersions.size(),
+ entityId.getType(),
+ entityId.getUuid(),
+ scope.getName(),
+ scope.getOwner(),
+ scope.getApplication()});
+
CpEntityManagerFactory cpemf = (CpEntityManagerFactory)emf;
final EntityIndex ei = cpemf.getManagerCache().getEntityIndex(scope);
@@ -66,6 +83,7 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
new SimpleId(scope.getOwner().getUuid(), scope.getOwner().getType()),
scope.getName()
);
+
rx.Observable.from(entityVersions)
.subscribeOn(Schedulers.io())
.buffer(serializationFig.getBufferSize())
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index c1d42ea..dca59e0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -71,7 +71,12 @@ public class FilteringLoader implements ResultsLoader {
* @param applicationScope The application scope to perform the load
* @param indexScope The index scope used in the search
*/
- protected FilteringLoader( final ManagerCache managerCache, final ResultsVerifier resultsVerifier, final ApplicationScope applicationScope, final IndexScope indexScope ) {
+ protected FilteringLoader(
+ final ManagerCache managerCache,
+ final ResultsVerifier resultsVerifier,
+ final ApplicationScope applicationScope,
+ final IndexScope indexScope ) {
+
this.managerCache = managerCache;
this.resultsVerifier = resultsVerifier;
this.applicationScope = applicationScope;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index f159146..5b9db6e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.index.EntityIndex;
@@ -53,6 +52,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.fasterxml.uuid.UUIDComparator;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.usergrid.corepersistence.GuiceModule.EVENTS_DISABLED;
import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
@@ -62,7 +62,6 @@ import static org.junit.Assert.assertTrue;
import org.junit.Before;
-//need to create system properties in test that can get applied
/**
* Test on read style clean-up of stale ElasticSearch indexes.
*/
@@ -84,7 +83,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
@After
public void after() {
- System.clearProperty( "allow.stale.entities" );
+ System.clearProperty( EVENTS_DISABLED );
}
@@ -94,6 +93,9 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
@Test
public void testUpdateVersioning() throws Exception {
+ // turn off post processing stuff that cleans up stale entities
+ System.setProperty( EVENTS_DISABLED, "true" );
+
final EntityManager em = app.getEntityManager();
Entity thing = em.create( "thing", new HashMap<String, Object>() {{
@@ -131,7 +133,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
logger.info( "Started testStaleIndexCleanup()" );
- // TODO: turn off post processing stuff that cleans up stale entities
+ // turn off post processing stuff that cleans up stale entities
+ System.setProperty( EVENTS_DISABLED, "true" );
final EntityManager em = app.getEntityManager();
@@ -160,9 +163,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
List<Entity> maxVersions = new ArrayList<>( numEntities );
-
-
-
for ( Entity thing : things ) {
Entity toUpdate = null;
@@ -174,7 +174,6 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
toUpdate.setProperty( "updateCount", updateCount.getAndIncrement() );
em.update( toUpdate );
-
count++;
if ( count % 100 == 0 ) {
logger.info( "Updated {} of {} times", count, numEntities * numUpdates );
@@ -249,7 +248,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
logger.info("Started testStaleIndexCleanup()");
// turn off post processing stuff that cleans up stale entities
- System.setProperty( "allow.stale.entities", "true" );
+ System.setProperty( EVENTS_DISABLED, "true" );
final EntityManager em = app.getEntityManager();
@@ -299,18 +298,21 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
crs = queryCollectionCp("things", "thing", "select *");
Assert.assertEquals( "Expect stale candidates", numEntities * (numUpdates + 1), crs.size());
+ // turn ON post processing stuff that cleans up stale entities
+ System.setProperty( EVENTS_DISABLED, "false" );
+
// delete all entities
for ( Entity thing : things ) {
em.delete( thing );
}
- em.refreshIndex();
// wait for indexes to be cleared for the deleted entities
count = 0;
do {
Thread.sleep(100);
crs = queryCollectionCp("things", "thing", "select *");
- } while ( crs.size() > 0 && count++ < 14 );
+ em.refreshIndex();
+ } while ( crs.size() > 0 && count++ < 15 );
Assert.assertEquals( "Expect no candidates", 0, crs.size() );
}
@@ -325,6 +327,9 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
logger.info( "Started testCleanupOnUpdate()" );
+ // turn off post processing stuff that cleans up stale entities
+ System.setProperty( EVENTS_DISABLED, "true" );
+
final EntityManager em = app.getEntityManager();
final int numEntities = 10;
@@ -341,9 +346,12 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
}
em.refreshIndex();
- CandidateResults crs = queryCollectionCp( "things", "things", "select *");
+ CandidateResults crs = queryCollectionCp( "things", "thing", "select *");
Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
+ // turn off post processing stuff that cleans up stale entities
+ System.setProperty( EVENTS_DISABLED, "false" );
+
// update each one a bunch of times
int count = 0;
@@ -369,6 +377,14 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
}
em.refreshIndex();
+ // wait for indexes to be cleared for the deleted entities
+ count = 0;
+ do {
+ Thread.sleep(100);
+ crs = queryCollectionCp("things", "thing", "select *");
+ em.refreshIndex();
+ } while ( crs.size() > 0 && count++ < 15 );
+
// query Core Persistence directly for total number of result candidates
crs = queryCollectionCp("things", "thing", "select *");
Assert.assertEquals( "Expect candidates without earlier stale entities", numEntities, crs.size() );
@@ -383,7 +399,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
EntityManager em = app.getEntityManager();
- CollectionScope cs = getCollectionScopeNameFromEntityType( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
+ CollectionScope cs = getCollectionScopeNameFromEntityType(
+ new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
EntityCollectionManagerFactory ecmf =
CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 23983e1..b24bd31 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -934,10 +934,10 @@ public class
EntityManager em = app.getEntityManager();
assertNotNull( em );
- int size = 20;
+ int initialSize = 20;
List<UUID> entityIds = new ArrayList<UUID>();
- for ( int i = 0; i < size; i++ ) {
+ for ( int i = 0; i < initialSize; i++ ) {
Map<String, Object> properties = new LinkedHashMap<String, Object>();
properties.put( "name", "object" + i );
Entity created = em.create( "objects", properties );
@@ -954,28 +954,33 @@ public class
LOG.info( JsonUtils.mapToFormattedJsonString( r.getEntities() ) );
- assertEquals( size, r.size() );
+ assertEquals(initialSize, r.size() );
// check they're all the same before deletion
- for ( int i = 0; i < size; i++ ) {
+ for ( int i = 0; i < initialSize; i++ ) {
assertEquals( entityIds.get( i ), r.getEntities().get( i ).getUuid() );
}
// now delete 5 items that will span the 10 pages
+ int numDeleted = 0;
for ( int i = 5; i < 10; i++ ) {
Entity entity = r.getEntities().get( i );
em.delete( entity );
entityIds.remove( entity.getUuid() );
+ numDeleted++;
}
em.refreshIndex();
+ // wait for indexes to be cleared
+ Thread.sleep( 500 );
+
// now query with paging
query = new Query();
r = em.searchCollection( em.getApplicationRef(), "objects", query );
- assertEquals( 10, r.size() );
+ assertEquals( query.getLimit(), r.size() );
for ( int i = 0; i < 10; i++ ) {
assertEquals( entityIds.get( i ), r.getEntities().get( i ).getUuid() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 2f555ed..6c049fc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -143,7 +143,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "Indexing entity documentId {} data {} ", indexId, entityAsMap );
- bulkRequest.add( client.prepareIndex(alias.getWriteAlias(), entityType, indexId ).setSource( entityAsMap ) );
+ bulkRequest.add( client.prepareIndex(
+ alias.getWriteAlias(), entityType, indexId ).setSource( entityAsMap ) );
maybeFlush();
@@ -181,7 +182,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
log.debug( "De-indexing type {} with documentId '{}'", entityType, indexId );
- bulkRequest.add( client.prepareDelete(alias.getWriteAlias(), entityType, indexId ).setRefresh( refresh ) );
+ bulkRequest.add( client.prepareDelete(
+ alias.getWriteAlias(), entityType, indexId ).setRefresh( refresh ) );
log.debug( "Deindexed Entity with index id " + indexId );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7f1d533b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 51ee915..1ae2d6a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -233,7 +233,7 @@ public class EsEntityIndexImpl implements EntityIndex {
// to receive documents. Occasionally we see errors.
// See this post: http://s.apache.org/index-missing-exception
- logger.info( "Refreshing Created new Index Name [{}]", alias);
+ logger.debug( "Testing new index name [{}]", alias);
final RetryOperation retryOperation = new RetryOperation() {
@Override
@@ -438,9 +438,6 @@ public class EsEntityIndexImpl implements EntityIndex {
public void refresh() {
-
- logger.info( "Refreshing Created new Index Name [{}]", alias);
-
final RetryOperation retryOperation = new RetryOperation() {
@Override
public boolean doOp() {
@@ -452,7 +449,7 @@ public class EsEntityIndexImpl implements EntityIndex {
return true;
}
catch ( IndexMissingException e ) {
- logger.error( "Unable to refresh index after create. Waiting before sleeping.", e );
+ logger.error( "Unable to refresh index. Waiting before sleeping.", e );
throw e;
}
}
@@ -510,8 +507,9 @@ public class EsEntityIndexImpl implements EntityIndex {
@Override
public void deleteAllVersionsOfEntity( Id entityId ) {
- final TermQueryBuilder tqb = QueryBuilders.termQuery(
- ENTITYID_ID_FIELDNAME, entityId.getUuid().toString().toLowerCase() );
+ String idString = IndexingUtils.idString( entityId ).toLowerCase();
+
+ final TermQueryBuilder tqb = QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, idString );
final DeleteByQueryResponse response = esProvider.getClient()
.prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute().actionGet();
@@ -526,11 +524,14 @@ public class EsEntityIndexImpl implements EntityIndex {
@Override
- public void deletePreviousVersions( final Id id, final UUID version ) {
+ public void deletePreviousVersions( final Id entityId, final UUID version ) {
+
+ String idString = IndexingUtils.idString( entityId ).toLowerCase();
final FilteredQueryBuilder fqb = QueryBuilders.filteredQuery(
- QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, id.getUuid().toString().toLowerCase()),
- FilterBuilders.rangeFilter( ENTITY_VERSION_FIELDNAME ).lt( version.timestamp() ) );
+ QueryBuilders.termQuery( ENTITYID_ID_FIELDNAME, idString ),
+ FilterBuilders.rangeFilter( ENTITY_VERSION_FIELDNAME ).lt( version.timestamp() )
+ );
final DeleteByQueryResponse response = esProvider.getClient()
.prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( fqb ).execute().actionGet();
@@ -538,14 +539,14 @@ public class EsEntityIndexImpl implements EntityIndex {
//error message needs to be retooled so that it describes the entity more throughly
logger.debug( "Deleted entity {}:{} with version {} from all "
+ "index scopes with response status = {}",
- new Object[] { id.getType(), id.getUuid(), version, response.status().toString() } );
+ new Object[] { entityId.getType(), entityId.getUuid(), version, response.status().toString() } );
checkDeleteByQueryResponse( fqb, response );
}
/**
- * Validate the response doens't contain errors, if it does, fail fast at the first error we encounter
+ * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
*/
private void checkDeleteByQueryResponse(
final QueryBuilder query, final DeleteByQueryResponse response ) {