You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/12/15 19:32:22 UTC
[38/50] incubator-usergrid git commit: Merge branch 'two-dot-o' into
two-dot-o-events
Merge branch 'two-dot-o' into two-dot-o-events
Conflicts:
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityDeleteListener.java
stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityDeleteListenerTest.java
stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c3261795
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c3261795
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c3261795
Branch: refs/heads/two-dot-o
Commit: c3261795af2e0992ae9aecd8332e733dac42e0a8
Parents: 1884cce bd0a1e9
Author: Dave Johnson <dm...@apigee.com>
Authored: Tue Nov 25 07:31:05 2014 -0500
Committer: Dave Johnson <dm...@apigee.com>
Committed: Tue Nov 25 07:31:05 2014 -0500
----------------------------------------------------------------------
stack/core/pom.xml | 32 +-
.../corepersistence/CpEntityManager.java | 134 +-
.../corepersistence/CpEntityManagerFactory.java | 2 +-
.../corepersistence/CpRelationManager.java | 35 +-
.../usergrid/corepersistence/GuiceModule.java | 1 +
.../migration/EntityDataMigration.java | 147 ++
.../migration/EntityTypeMappingMigration.java | 28 +-
.../migration/GraphShardVersionMigration.java | 104 +-
.../corepersistence/migration/Versions.java | 3 +
.../rx/AllEntitiesInSystemObservable.java | 27 +-
.../rx/ApplicationObservable.java | 65 +-
.../corepersistence/util/CpEntityMapUtils.java | 62 +-
.../corepersistence/util/CpNamingUtils.java | 17 +
.../org/apache/usergrid/CoreITSetupImpl.java | 13 +
.../corepersistence/StaleIndexCleanupTest.java | 6 +-
.../migration/EntityDataMigrationIT.java | 252 ++++
.../migration/EntityTypeMappingMigrationIT.java | 74 +-
.../migration/GraphShardVersionMigrationIT.java | 153 +-
.../rx/AllEntitiesInSystemObservableIT.java | 23 +-
.../usergrid/persistence/LargeEntityIT.java | 112 ++
stack/core/src/test/resources/largeentity.json | 1329 ++++++++++++++++++
.../collection/EntityCollectionManager.java | 2 +-
.../exception/EntityTooLargeException.java | 67 +
.../collection/guice/CollectionModule.java | 2 -
.../impl/EntityCollectionManagerImpl.java | 294 ++--
.../collection/impl/EntityDeletedTask.java | 11 +-
.../impl/EntityVersionCleanupTask.java | 13 +-
.../mvcc/MvccEntitySerializationStrategy.java | 23 +-
.../mvcc/MvccLogEntrySerializationStrategy.java | 3 +-
.../mvcc/stage/delete/MarkCommit.java | 19 +-
.../mvcc/stage/write/WriteCommit.java | 3 +-
.../serialization/SerializationFig.java | 15 +-
.../UniqueValueSerializationStrategy.java | 3 +-
.../serialization/impl/EntityRepairImpl.java | 2 +-
.../MvccEntitySerializationStrategyImpl.java | 343 ++---
...vccEntitySerializationStrategyProxyImpl.java | 162 +++
.../MvccEntitySerializationStrategyV1Impl.java | 219 +++
.../MvccEntitySerializationStrategyV2Impl.java | 238 ++++
.../MvccLogEntrySerializationStrategyImpl.java | 2 +-
.../serialization/impl/SerializationModule.java | 25 +-
.../serialization/impl/SettingsValidation.java | 58 +
.../UniqueValueSerializationStrategyImpl.java | 2 +-
.../EntityCollectionManagerFactoryTest.java | 2 +-
.../collection/EntityCollectionManagerIT.java | 64 +-
.../EntityCollectionManagerStressTest.java | 2 +-
.../EntityCollectionManagerSyncIT.java | 2 +-
.../collection/guice/MigrationManagerRule.java | 38 -
.../collection/guice/TestCollectionModule.java | 10 +
.../impl/EntityVersionCleanupTaskTest.java | 11 +-
...niqueValueSerializationStrategyImplTest.java | 2 +-
.../mvcc/stage/write/WriteUniqueVerifyIT.java | 2 +-
.../mvcc/stage/write/WriteUniqueVerifyTest.java | 2 +-
.../serialization/EntityRepairImplTest.java | 2 +-
...MvccEntitySerializationStrategyImplTest.java | 87 +-
...cEntitySerializationStrategyProxyV1Test.java | 85 ++
...cEntitySerializationStrategyProxyV2Test.java | 83 ++
...ccEntitySerializationStrategyV1ImplTest.java | 55 +
...ccEntitySerializationStrategyV2ImplTest.java | 55 +
.../MvccEntitySerializationStrategyV2Test.java | 229 +++
.../impl/MvccLESSTransientTest.java | 55 +-
...ccLogEntrySerializationStrategyImplTest.java | 2 +-
.../impl/SettingsValidationTest.java | 131 ++
.../collection/util/EntityHelper.java | 105 ++
.../src/test/resources/log4j.properties | 21 +-
.../persistence/core/astyanax/CassandraFig.java | 12 +
.../persistence/core/astyanax/FieldBuffer.java | 70 +
.../core/astyanax/FieldBufferBuilder.java | 87 ++
.../core/astyanax/FieldBufferParser.java | 81 ++
.../core/astyanax/FieldBufferSerializer.java | 134 ++
.../core/guicyfig/SetConfigTestBypass.java | 108 ++
.../astyanax/FieldBufferSerializerTest.java | 126 ++
.../core/guice/MaxMigrationModule.java | 39 +
.../core/guice/MaxMigrationVersion.java | 40 +
.../core/guice/MigrationManagerRule.java | 19 +-
.../persistence/graph/GraphManagerIT.java | 2 +-
.../persistence/graph/GraphManagerLoadTest.java | 2 +-
.../graph/GraphManagerShardingIT.java | 2 +-
.../graph/GraphManagerStressTest.java | 2 +-
.../usergrid/persistence/graph/SimpleTest.java | 2 +-
.../graph/guice/TestGraphModule.java | 11 +
.../graph/impl/EdgeDeleteListenerTest.java | 2 +-
.../graph/impl/NodeDeleteListenerTest.java | 2 +-
.../graph/impl/stage/EdgeDeleteRepairTest.java | 2 +-
.../graph/impl/stage/EdgeMetaRepairTest.java | 2 +-
.../EdgeMetadataSerializationTest.java | 2 +-
.../EdgeSerializationChopTest.java | 2 +-
.../serialization/EdgeSerializationTest.java | 2 +-
.../serialization/NodeSerializationTest.java | 2 +-
.../impl/shard/EdgeShardSerializationTest.java | 2 +-
.../NodeShardCounterSerializationTest.java | 2 +-
.../persistence/map/MapManagerTest.java | 2 +-
.../persistence/model/field/LocationField.java | 3 -
.../index/impl/EntityIndexMapUtils.java | 4 +-
.../persistence/queue/QueueManagerTest.java | 26 +-
.../collection/groups/GroupResourceIT.java | 182 +--
.../applications/queries/GeoPagingTest.java | 41 +-
.../applications/queries/basicGeoTests.java | 184 +++
97 files changed, 5219 insertions(+), 1150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 5a193e5,b2d854b..c3e5bbe
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@@ -69,7 -69,7 +69,6 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerifyException;
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
--import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.entities.Application;
@@@ -214,23 -215,23 +214,22 @@@ public class CpEntityManager implement
// set to false for now
this.skipAggregateCounters = false;
-- int entityCacheSize =
-- Integer.parseInt( cass.getProperties().getProperty( "usergrid.entity_cache_size", "100" ) );
++ int entityCacheSize = Integer.parseInt(
++ cass.getProperties().getProperty( "usergrid.entity_cache_size", "100" ) );
-- int entityCacheTimeout =
-- Integer.parseInt( cass.getProperties().getProperty( "usergrid.entity_cache_timeout_ms", "500" ) );
++ int entityCacheTimeout = Integer.parseInt(
++ cass.getProperties().getProperty( "usergrid.entity_cache_timeout_ms", "500" ) );
this.entityCache = CacheBuilder.newBuilder().maximumSize( entityCacheSize )
-- .expireAfterWrite( entityCacheTimeout, TimeUnit.MILLISECONDS )
-- .build( new CacheLoader<EntityScope, org.apache.usergrid.persistence.model
-- .entity.Entity>() {
-- public org.apache.usergrid.persistence.model.entity.Entity load(
-- EntityScope es ) {
-- return managerCache.getEntityCollectionManager( es.scope )
-- .load( es.entityId ).toBlocking()
-- .lastOrDefault( null );
-- }
-- } );
++ .expireAfterWrite(entityCacheTimeout, TimeUnit.MILLISECONDS)
++ .build(new CacheLoader<EntityScope, org.apache.usergrid.persistence.model.entity.Entity>() {
++ public org.apache.usergrid.persistence.model.entity.Entity load(
++ EntityScope es) {
++ return managerCache.getEntityCollectionManager(es.scope)
++ .load(es.entityId).toBlocking()
++ .lastOrDefault(null);
++ }
++ });
}
@@@ -347,8 -348,8 +346,8 @@@
*/
@Metered( group = "core", name = "EntityManager_create" )
@TraceParticipant
-- public <A extends Entity> A create( String entityType, Class<A> entityClass, Map<String, Object> properties,
-- UUID importId ) throws Exception {
++ public <A extends Entity> A create( String entityType, Class<A> entityClass,
++ Map<String, Object> properties, UUID importId ) throws Exception {
UUID timestampUuid = importId != null ? importId : UUIDUtils.newTimeUUID();
@@@ -372,13 -373,9 +371,10 @@@
}
Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
- String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- collectionName );
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entityRef.getType());
++ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++ applicationScope.getApplication(), entityRef.getType());
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
// throw new IllegalArgumentException(
@@@ -390,10 -387,10 +386,10 @@@
if ( cpEntity == null ) {
if ( logger.isDebugEnabled() ) {
logger.debug( "FAILED to load entity {}:{} from scope\n app {}\n owner {}\n name {}",
-- new Object[] {
-- id.getType(), id.getUuid(), collectionScope.getApplication(),
-- collectionScope.getOwner(), collectionScope.getName()
-- } );
++ new Object[] {
++ id.getType(), id.getUuid(), collectionScope.getApplication(),
++ collectionScope.getOwner(), collectionScope.getName()
++ } );
}
return null;
}
@@@ -457,13 -454,10 +453,11 @@@
String type = Schema.getDefaultSchema().getEntityType( entityClass );
Id id = new SimpleId( entityId, type );
- String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( type );
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- collectionName );
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), type);
++ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++ applicationScope.getApplication(), type);
+
// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
// throw new IllegalArgumentException(
@@@ -522,18 -516,17 +516,22 @@@
public void update( Entity entity ) throws Exception {
// first, update entity index in its own collection scope
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
+
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entity.getType());
++ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++ applicationScope.getApplication(), entity.getType());
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
if ( logger.isDebugEnabled() ) {
-- logger.debug( "Updating entity {}:{} from scope\n app {}\n owner {}\n name {}", new Object[] {
-- entityId.getType(), entityId.getUuid(), collectionScope.getApplication(),
-- collectionScope.getOwner(), collectionScope.getName()
-- } );
++ logger.debug( "Updating entity {}:{} from scope\n app {}\n owner {}\n name {}",
++ new Object[] {
++ entityId.getType(),
++ entityId.getUuid(),
++ collectionScope.getApplication(),
++ collectionScope.getOwner(),
++ collectionScope.getName()
++ } );
}
// if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@@ -590,9 -583,7 +588,8 @@@
private Observable deleteAsync( EntityRef entityRef ) throws Exception {
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entityRef.getType() );
++ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++ applicationScope.getApplication(), entityRef.getType() );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@@ -866,9 -857,9 +863,8 @@@
}
-- private Iterable<EntityRef> getEntityRefsForUniqueProperty( String collName, String propName, String alias )
-- throws Exception {
--
++ private Iterable<EntityRef> getEntityRefsForUniqueProperty(
++ String collName, String propName, String alias ) throws Exception {
final Id id = getIdForUniqueEntityField( collName, propName, alias );
@@@ -900,15 -891,15 +896,17 @@@
get( entityRef ).getType();
}
catch ( Exception e ) {
-- logger.error( "Unable to load entity " + entityRef.getType() + ":" + entityRef.getUuid(), e );
++ logger.error( "Unable to load entity " + entityRef.getType()
++ + ":" + entityRef.getUuid(), e );
}
if ( entityRef == null ) {
-- throw new EntityNotFoundException( "Entity " + entityId.toString() + " cannot be verified" );
++ throw new EntityNotFoundException(
++ "Entity " + entityId.toString() + " cannot be verified" );
}
if ( ( entityType != null ) && !entityType.equalsIgnoreCase( entityRef.getType() ) ) {
throw new UnexpectedEntityTypeException(
-- "Entity " + entityId + " is not the expected type, expected " + entityType + ", found "
-- + entityRef.getType() );
++ "Entity " + entityId + " is not the expected type, expected "
++ + entityType + ", found " + entityRef.getType() );
}
}
return entityRef;
@@@ -924,7 -915,7 +922,8 @@@
@Override
-- public List<Entity> getPartialEntities( Collection<UUID> ids, Collection<String> properties ) throws Exception {
++ public List<Entity> getPartialEntities(
++ Collection<UUID> ids, Collection<String> properties ) throws Exception {
throw new UnsupportedOperationException( "Not supported yet." );
}
@@@ -938,15 -929,15 +937,16 @@@
@Override
-- public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
++ public void setProperty(
++ EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
setProperty( entityRef, propertyName, propertyValue, false );
}
@Override
-- public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue, boolean override )
-- throws Exception {
++ public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue,
++ boolean override ) throws Exception {
if ( ( propertyValue instanceof String ) && ( ( String ) propertyValue ).equals( "" ) ) {
propertyValue = null;
@@@ -954,8 -945,8 +954,8 @@@
Entity entity = get( entityRef );
-- propertyValue =
-- Schema.getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
++ propertyValue = Schema.getDefaultSchema().validateEntityPropertyValue(
++ entity.getType(), propertyName, propertyValue );
entity.setProperty( propertyName, propertyValue );
entity.setProperty( PROPERTY_MODIFIED, UUIDUtils.getTimestampInMillis( UUIDUtils.newTimeUUID() ) );
@@@ -1010,15 -1001,10 +1010,12 @@@
@Override
public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
- CollectionScope collectionScope = getCollectionScopeNameFromEntityType(getApplicationScope().getApplication(), entityRef.getType());
+
- String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
-
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- collectionName );
++ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(
++ getApplicationScope().getApplication(), entityRef.getType());
IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+ getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );
@@@ -1062,15 -1048,15 +1059,16 @@@
@Override
-- public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue ) throws Exception {
++ public void addToDictionary( EntityRef entityRef, String dictionaryName,
++ Object elementValue ) throws Exception {
addToDictionary( entityRef, dictionaryName, elementValue, null );
}
@Override
-- public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementName, Object elementValue )
-- throws Exception {
++ public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementName,
++ Object elementValue ) throws Exception {
if ( elementName == null ) {
return;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index a57158f,018a9b7..2ff6c4f
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@@ -18,15 -19,10 +18,16 @@@ package org.apache.usergrid.corepersist
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
+ import org.apache.usergrid.corepersistence.migration.EntityDataMigration;
import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
+import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
+import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
+import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index e1c0214,9d0c9e6..f159146
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@@ -51,17 -51,13 +51,18 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.fasterxml.uuid.UUIDComparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import org.junit.Before;
+//need to create system properties in test that can get applied
/**
* Test on read style clean-up of stale ElasticSearch indexes.
*/
@@@ -382,13 -221,9 +383,10 @@@ public class StaleIndexCleanupTest exte
EntityManager em = app.getEntityManager();
- CollectionScope cs = new CollectionScopeImpl(
- new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
- new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
- CpNamingUtils.getCollectionScopeNameFromEntityType( eref.getType() ) );
+ CollectionScope cs = getCollectionScopeNameFromEntityType( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
- EntityCollectionManagerFactory ecmf = CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
+ EntityCollectionManagerFactory ecmf =
+ CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
EntityCollectionManager ecm = ecmf.createCollectionManager( cs );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index e47d460,534d7a6..4de18fe
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@@ -43,7 -45,7 +43,7 @@@ public interface EntityCollectionManage
/**
* MarkCommit the entity and remove it's indexes with the given entity id
*/
-- public Observable<Void> delete( Id entityId );
++ public Observable<Id> delete( Id entityId );
/**
* Load the entity with the given entity Id
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 4c96303,8015ca9..1c3e258
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@@ -18,8 -18,6 +18,7 @@@
package org.apache.usergrid.persistence.collection.guice;
- import java.util.Collections;
+
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@@ -50,9 -43,7 +49,8 @@@ import com.google.inject.Inject
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-
+import com.google.inject.multibindings.Multibinder;
- import java.util.List;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 437c0d3,d54c5f7..a89924a
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@@ -60,10 -65,6 +65,12 @@@ import com.netflix.astyanax.connectionp
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.StringSerializer;
++import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
++import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
- import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
++import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
- import org.apache.usergrid.persistence.core.util.Health;
import rx.Observable;
import rx.Subscriber;
@@@ -109,24 -106,16 +117,26 @@@ public class EntityCollectionManagerImp
@Inject
- public EntityCollectionManagerImpl(
- @Write final WriteStart writeStart,
- @WriteUpdate final WriteStart writeUpdate,
- final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify,
- final WriteCommit writeCommit, final RollbackAction rollback,
- final MarkStart markStart, final MarkCommit markCommit,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace,
- final SerializationFig config,
- final EntityVersionCleanupFactory entityVersionCleanupFactory,
- final EntityVersionCreatedFactory entityVersionCreatedFactory,
- final EntityDeletedFactory entityDeletedFactory,
- public EntityCollectionManagerImpl( @Write final WriteStart writeStart, @WriteUpdate final WriteStart writeUpdate,
- final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify,
- final WriteCommit writeCommit, final RollbackAction rollback,
- final MarkStart markStart, final MarkCommit markCommit,
- @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace, final SerializationFig config,
- @Assisted final CollectionScope collectionScope ) {
++ public EntityCollectionManagerImpl(
++ @Write final WriteStart writeStart,
++ @WriteUpdate final WriteStart writeUpdate,
++ final WriteUniqueVerify writeVerifyUnique,
++ final WriteOptimisticVerify writeOptimisticVerify,
++ final WriteCommit writeCommit,
++ final RollbackAction rollback,
++ final MarkStart markStart,
++ final MarkCommit markCommit,
++ @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
++ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
++ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
++ final Keyspace keyspace,
++ final SerializationFig config,
++ final EntityVersionCleanupFactory entityVersionCleanupFactory,
++ final EntityVersionCreatedFactory entityVersionCreatedFactory,
++ final EntityDeletedFactory entityDeletedFactory,
+ @CollectionTaskExecutor final TaskExecutor taskExecutor,
- @Assisted final CollectionScope collectionScope
++ @Assisted final CollectionScope collectionScope
+ ) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
@@@ -176,42 -161,25 +187,42 @@@
// observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
// writeVerifyUnique, writeOptimisticVerify );
- observable.map(writeCommit).doOnNext(new Action1<Entity>() {
- // return the commit result.
- return observable.map( writeCommit ).doOnError( rollback );
++ return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
+ @Override
+ public void call(final Entity entity) {
+ //TODO fire the created task first then the entityVersioncleanup
+ taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
+ taskExecutor.submit(entityVersionCleanupFactory.getTask(collectionScope, entityId,entity.getVersion()));
+ //post-processing to come later. leave it empty for now.
+ }
+ }).doOnError(rollback);
-
-
- // return the commit result.
- return observable.map(writeCommit).doOnError(rollback);
}
@Override
- public Observable<Void> delete(final Id entityId) {
-
- Preconditions.checkNotNull(entityId, "Entity id is required in this stage");
- Preconditions.checkNotNull(entityId.getUuid(), "Entity id is required in this stage");
- Preconditions.checkNotNull(entityId.getType(), "Entity type is required in this stage");
-
- public Observable<Void> delete( final Id entityId ) {
++ public Observable<Id> delete( final Id entityId ) {
+
+ Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
+ Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
+ Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
+
- return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) ).map( markStart )
- .doOnNext( markCommit ).map( new Func1<CollectionIoEvent<MvccEntity>, Void>() {
- @Override
- public Void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
- return null;
- }
- } );
++ Observable<Id> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId))
++ .map( markStart)
++ .doOnNext( markCommit)
++ .map( new Func1<CollectionIoEvent<MvccEntity>, Id>() {
++
++ @Override
++ public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
++ MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
++ Task<Void> task = entityDeletedFactory
++ .getTask( collectionScope, entity.getId(), entity.getVersion());
++ taskExecutor.submit(task);
++ return entity.getId();
++ }
++ }
++ );
+
- Observable<Void> o = Observable.from(new CollectionIoEvent<Id>(collectionScope, entityId)).map(markStart)
- .doOnNext(markCommit).map(new Func1<CollectionIoEvent<MvccEntity>, Void>() {
- @Override
- public Void call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
- MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
- Task<Void> task = entityDeletedFactory.getTask(
- collectionScope,entity.getId(),entity.getVersion());
- taskExecutor.submit(task);
- return null;
- }
- });
+ return o;
}
@@@ -289,23 -261,30 +304,31 @@@
final Id entityId = entity.getId();
- ValidationUtils.verifyIdentity(entityId);
+
+ ValidationUtils.verifyIdentity( entityId );
// create our observable and start the write
- CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(collectionScope, entity);
+ CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
- Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner(writeData, writeUpdate);
- return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
+ Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeUpdate );
+
+
+ return observable.map( writeCommit ).doOnNext( new Action1<Entity>() {
@Override
- public void call(final Entity entity) {
- logger.debug("sending entity to the queue");
+ public void call( final Entity entity ) {
+ logger.debug( "sending entity to the queue" );
//we an update, signal the fix
+ taskExecutor.submit(entityVersionCreatedFactory.getTask(collectionScope,entity));
+ //TODO T.N Change this to fire a task
+ // Observable.from( new CollectionIoEvent<Id>(collectionScope,
+ // entityId ) ).map( load ).subscribeOn( Schedulers.io() ).subscribe();
+
+
}
- }).doOnError(rollback);
+ } ).doOnError( rollback );
}
@@@ -381,5 -358,4 +402,4 @@@
return Health.RED;
}
-
--}
++}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 284cf5b,0000000..9ff4f56
mode 100644,000000..100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@@ -1,147 -1,0 +1,148 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.MutationBatch;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.Set;
+import java.util.UUID;
++import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+
+
+/**
+ * Fires Cleanup Task
+ */
+public class EntityDeletedTask implements Task<Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class);
+
- private EntityVersionCleanupFactory entityVersionCleanupFactory;
- private MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private MvccEntitySerializationStrategy entitySerializationStrategy;
- private Set<EntityDeleted> listeners;
++ private final EntityVersionCleanupFactory entityVersionCleanupFactory;
++ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
++ private final MvccEntitySerializationStrategy entitySerializationStrategy;
++ private final Set<EntityDeleted> listeners;
+ private final CollectionScope collectionScope;
+ private final Id entityId;
+ private final UUID version;
+
+
+ @Inject
+ public EntityDeletedTask(
+ EntityVersionCleanupFactory entityVersionCleanupFactory,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
++ @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject
+ @Assisted final CollectionScope collectionScope,
+ @Assisted final Id entityId,
+ @Assisted final UUID version) {
+
+ this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.listeners = listeners;
+ this.collectionScope = collectionScope;
+ this.entityId = entityId;
+ this.version = version;
+ }
+
+
+ @Override
+ public void exceptionThrown(Throwable throwable) {
+ LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+ new Object[] { collectionScope, entityId, version }, throwable );
+ }
+
+
+ @Override
+ public Void rejected() {
+ try {
+ call();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Exception thrown in call task", e );
+ }
+
+ return null;
+ }
+
+
+ @Override
+ public Void call() throws Exception {
+
+ entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
+
+ fireEvents();
+ final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
+ final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
+ entityDelete.execute();
+ logDelete.execute();
+
+ return null;
+ }
+
+
+ private void fireEvents() {
+ final int listenerSize = listeners.size();
+
+ if ( listenerSize == 0 ) {
+ return;
+ }
+
+ if ( listenerSize == 1 ) {
+ listeners.iterator().next().deleted( collectionScope, entityId,version );
+ return;
+ }
+
+ LOG.debug( "Started firing {} listeners", listenerSize );
+
+ //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
+ Observable.from(listeners)
+ .parallel( new Func1<Observable<EntityDeleted>, Observable<EntityDeleted>>() {
+
+ @Override
+ public Observable<EntityDeleted> call(
+ final Observable<EntityDeleted> entityVersionDeletedObservable ) {
+
+ return entityVersionDeletedObservable.doOnNext( new Action1<EntityDeleted>() {
+ @Override
+ public void call( final EntityDeleted listener ) {
+ listener.deleted(collectionScope, entityId, version);
+ }
+ } );
+ }
+ }, Schedulers.io() ).toBlocking().last();
+
+ LOG.debug( "Finished firing {} listeners", listenerSize );
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index d4bf6e6,422a155..efecdeb
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@@ -45,7 -22,6 +45,8 @@@ import org.apache.usergrid.persistence.
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import java.util.Set;
++import org.apache.usergrid.persistence.core.guice.ProxyImpl;
import rx.Observable;
import rx.functions.Action1;
@@@ -75,17 -51,11 +76,17 @@@ public class EntityVersionCleanupTask i
private final UUID version;
- public EntityVersionCleanupTask( final SerializationFig serializationFig,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final Keyspace keyspace, final List<EntityVersionDeleted> listeners,
- final CollectionScope scope, final Id entityId, final UUID version ) {
+ @Inject
+ public EntityVersionCleanupTask(
+ final SerializationFig serializationFig,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
++ @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final Keyspace keyspace,
+ final Set<EntityVersionDeleted> listeners, // MUST be a set or Guice will not inject
+ @Assisted final CollectionScope scope,
+ @Assisted final Id entityId,
+ @Assisted final UUID version ) {
this.serializationFig = serializationFig;
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
@@@ -123,79 -93,66 +124,81 @@@
@Override
public Void call() throws Exception {
-
-
- final UUID maxVersion = version;
-
-
- Observable<MvccLogEntry> versions = Observable.create( new ObservableIterator( "versionIterators" ) {
- @Override
- protected Iterator getIterator() {
- return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, maxVersion,
- serializationFig.getBufferSize() );
- }
- } );
-
-
- //get the uuid from the version
- versions.map( new Func1<MvccLogEntry, UUID>() {
- @Override
- public UUID call( final MvccLogEntry mvccLogEntry ) {
- return mvccLogEntry.getVersion();
- }
- } )
- //buffer our versions
- .buffer( serializationFig.getBufferSize() )
- //for each buffer set, delete all of them
- .doOnNext( new Action1<List<UUID>>() {
- @Override
- public void call( final List<UUID> versions ) {
-
- //Fire all the listeners
- fireEvents( versions );
-
- MutationBatch entityBatch = keyspace.prepareMutationBatch();
- MutationBatch logBatch = keyspace.prepareMutationBatch();
-
- for ( UUID version : versions ) {
- final MutationBatch entityDelete = entitySerializationStrategy.delete( scope, entityId, version );
-
- entityBatch.mergeShallow( entityDelete );
-
- final MutationBatch logDelete = logEntrySerializationStrategy.delete( scope, entityId, version );
-
- logBatch.mergeShallow( logDelete );
+ //TODO Refactor this logic into a a class that can be invoked from anywhere
+ //load every entity we have history of
+ Observable<List<MvccEntity>> deleteFieldsObservable =
+ Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
+ @Override
+ protected Iterator<MvccEntity> getIterator() {
- Iterator<MvccEntity> entities = entitySerializationStrategy
- .load(scope, entityId, version, serializationFig.getBufferSize());
++ Iterator<MvccEntity> entities = entitySerializationStrategy.loadDescendingHistory(
++ scope, entityId, version, 1000); // TODO: what fetchsize should we use here?
+ return entities;
}
- }) //buffer them for efficiency
++ })
++ //buffer them for efficiency
+ .skip(1)
+ .buffer(serializationFig.getBufferSize()).doOnNext(
+ new Action1<List<MvccEntity>>() {
+ @Override
+ public void call(final List<MvccEntity> mvccEntities) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch entityBatch = keyspace.prepareMutationBatch();
+ final MutationBatch logBatch = keyspace.prepareMutationBatch();
+
+ for (MvccEntity mvccEntity : mvccEntities) {
+ if (!mvccEntity.getEntity().isPresent()) {
+ continue;
+ }
+
+ final UUID entityVersion = mvccEntity.getVersion();
+ final Entity entity = mvccEntity.getEntity().get();
+
+ //remove all unique fields from the index
+ for (final Field field : entity.getFields()) {
+ if (!field.isUnique()) {
+ continue;
+ }
+ final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
- final MutationBatch deleteMutation = uniqueValueSerializationStrategy.delete(scope,unique);
++ final MutationBatch deleteMutation =
++ uniqueValueSerializationStrategy.delete(scope,unique);
+ batch.mergeShallow(deleteMutation);
+ }
+
+ final MutationBatch entityDelete = entitySerializationStrategy
+ .delete(scope, entityId, mvccEntity.getVersion());
+ entityBatch.mergeShallow(entityDelete);
+ final MutationBatch logDelete = logEntrySerializationStrategy
+ .delete(scope, entityId, version);
+ logBatch.mergeShallow(logDelete);
+ }
+
+ try {
+ batch.execute();
+ } catch (ConnectionException e1) {
+ throw new RuntimeException("Unable to execute " +
+ "unique value " +
+ "delete", e1);
+ }
+ fireEvents(mvccEntities);
+ try {
+ entityBatch.execute();
+ } catch (ConnectionException e) {
+ throw new RuntimeException("Unable to delete entities in cleanup", e);
+ }
+
+ try {
+ logBatch.execute();
+ } catch (ConnectionException e) {
+ throw new RuntimeException("Unable to delete entities from the log", e);
+ }
-
- try {
- entityBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to delete entities in cleanup", e );
- }
-
- try {
- logBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to delete entities from the log", e );
}
}
- } ).count().toBlocking().last();
+ );
+
+ final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+
+ logger.debug("Removed unique values for {} entities of entity {}",removedCount,entityId);
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index 994465d,e31aba5..baf2ac3
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@@ -19,8 -19,8 +19,6 @@@
package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
--import java.util.Iterator;
--import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
@@@ -36,13 -36,14 +34,9 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
--import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
--import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
- import org.apache.usergrid.persistence.core.rx.ObservableIterator;
- import org.apache.usergrid.persistence.model.entity.Entity;
+ import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
-import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
--import org.apache.usergrid.persistence.model.field.Field;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@@ -51,7 -52,7 +45,6 @@@ import com.netflix.astyanax.Keyspace
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
--import rx.Observable;
import rx.functions.Action1;
@@@ -119,72 -120,69 +112,70 @@@ public class MarkCommit implements Acti
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to mark entry as deleted" );
}
- //<<<<<<< HEAD
- //=======
-
-
- //TODO Refactor this logic into a a class that can be invoked from anywhere
- //load every entity we have history of
- Observable<List<MvccEntity>> deleteFieldsObservable =
- Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
- @Override
- protected Iterator<MvccEntity> getIterator() {
- Iterator<MvccEntity> entities =
- entityStrat.loadDescendingHistory( collectionScope, entityId, entity.getVersion(), 100 );
-
- return entities;
- }
- } ) //buffer them for efficiency
- .buffer( serializationFig.getBufferSize() ).doOnNext(
-
- new Action1<List<MvccEntity>>() {
- @Override
- public void call( final List<MvccEntity> mvccEntities ) {
-
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( MvccEntity mvccEntity : mvccEntities ) {
- if ( !mvccEntity.getEntity().isPresent() ) {
- continue;
- }
-
- final UUID entityVersion = mvccEntity.getVersion();
-
- final Entity entity = mvccEntity.getEntity().get();
-
- //remove all unique fields from the index
- for ( final Field field : entity.getFields() ) {
-
- if(!field.isUnique()){
- continue;
- }
-
- final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
-
- final MutationBatch deleteMutation = uniqueValueStrat.delete(collectionScope, unique );
-
- batch.mergeShallow( deleteMutation );
- }
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e1 ) {
- throw new RuntimeException( "Unable to execute " +
- "unique value " +
- "delete", e1 );
- }
- }
- }
-
-
- );
-
- final int removedCount = deleteFieldsObservable.count().toBlocking().last();
-
- LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
+ }
+ }
++
+//
+//
+// //TODO Refactor this logic into a a class that can be invoked from anywhere
+// //load every entity we have history of
+// Observable<List<MvccEntity>> deleteFieldsObservable =
+// Observable.create( new ObservableIterator<MvccEntity>( "deleteColumns" ) {
+// @Override
+// protected Iterator<MvccEntity> getIterator() {
+// Iterator<MvccEntity> entities =
+// entityStrat.load( collectionScope, entityId, entity.getVersion(), 100 );
+//
+// return entities;
+// }
+// } ) //buffer them for efficiency
+// .buffer( serializationFig.getBufferSize() ).doOnNext(
+//
+// new Action1<List<MvccEntity>>() {
+// @Override
+// public void call( final List<MvccEntity> mvccEntities ) {
+//
+//
+// final MutationBatch batch = keyspace.prepareMutationBatch();
+//
+// for ( MvccEntity mvccEntity : mvccEntities ) {
+// if ( !mvccEntity.getEntity().isPresent() ) {
+// continue;
+// }
+//
+// final UUID entityVersion = mvccEntity.getVersion();
+//
+// final Entity entity = mvccEntity.getEntity().get();
+//
+// //remove all unique fields from the index
+// for ( final Field field : entity.getFields() ) {
+//
+// if(!field.isUnique()){
+// continue;
+// }
+//
+// final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion );
+//
+// final MutationBatch deleteMutation = uniqueValueStrat.delete(collectionScope, unique );
+//
+// batch.mergeShallow( deleteMutation );
+// }
+// }
+//
+// try {
+// batch.execute();
+// }
+// catch ( ConnectionException e1 ) {
+// throw new RuntimeException( "Unable to execute " +
+// "unique value " +
+// "delete", e1 );
+// }
+// }
+// }
+//
+//
+// );
+//
+// final int removedCount = deleteFieldsObservable.count().toBlocking().last();
+//
+// LOG.debug("Removed unique values for {} entities of entity {}", removedCount, entityId );
- //>>>>>>> befcdcab6f7f1c83dbcb2a24eddb055c9297d59f
- }
- }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
index 73f1b6d,e2e6876..dd3c013
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestCollectionModule.java
@@@ -21,7 -21,12 +21,8 @@@ package org.apache.usergrid.persistence
import org.apache.usergrid.persistence.core.guice.CommonModule;
+ import org.apache.usergrid.persistence.core.guice.MaxMigrationModule;
-import org.apache.usergrid.persistence.core.guice.MaxMigrationVersion;
import org.apache.usergrid.persistence.core.guice.TestModule;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-
-import com.google.inject.multibindings.Multibinder;
public class TestCollectionModule extends TestModule {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3261795/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
index eae1c54,1fce6e2..d0a87c3
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTaskTest.java
@@@ -139,31 -119,22 +139,32 @@@ public class EntityVersionCleanupTaskTe
final MutationBatch newBatch = mock( MutationBatch.class );
- //set up returning a mutator
- when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ // set up returning a mutator
+ when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( newBatch );
-
- when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ when(less.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( newBatch );
+ final List<MvccEntity> mel = new ArrayList<MvccEntity>();
- //start the task
- ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- //wait for the task
- future.get();
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
- //verify it was run
++ when( ess.loadDescendingHistory(
++ same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+ .thenReturn(mel.iterator() );
+
+ try {
+ cleanupTask.call();
+ }catch(Exception e){
+ Assert.fail( e.getMessage() );
+ }
+
+ // verify it was run
verify( entityBatch ).execute();
verify( logBatch ).execute();
@@@ -239,38 -195,24 +240,38 @@@
//set up returning a mutator
- when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ when(ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( batch );
-
- when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ when( mvccLogEntrySerializationStrategy
+ .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( batch );
+ final List<MvccEntity> mel = new ArrayList<MvccEntity>();
- //start the task
- ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- //wait for the task
- future.get();
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
- //verify it was run
- verify( entityBatch, never() ).execute();
++ when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+ .thenReturn(mel.iterator() );
- verify( logBatch, never() ).execute();
+ //start the task
+ try {
+ cleanupTask.call();
+ }catch(Exception e){
+ Assert.fail( e.getMessage() );
+ }
+
+
+ // These last two verify statements do not make sense. We cannot assert that the entity
+ // and log batches are never called. Even if there are no listeners the entity delete
+ // cleanup task will still run to do the normal cleanup.
+ //
+ // verify( entityBatch, never() ).execute();
+ // verify( logBatch, never() ).execute();
}
@@@ -358,23 -283,11 +359,23 @@@
.thenReturn( batch );
- //start the task
- ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+ final List<MvccEntity> mel = new ArrayList<MvccEntity>();
- //wait for the task
- future.get();
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
+
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
+
- when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
++ when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+ .thenReturn(mel.iterator() );
+
+
+ try {
+ cleanupTask.call();
+ }catch(Exception e){
+ Assert.fail( e.getMessage() );
+ }
//we deleted the version
//verify it was run
@@@ -462,36 -366,27 +463,36 @@@
//set up returning a mutator
- when( mvccEntitySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ when( ess.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( batch );
-
- when( mvccLogEntrySerializationStrategy.delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
+ when( mvccLogEntrySerializationStrategy
+ .delete( same( appScope ), same( entityId ), any( UUID.class ) ) )
.thenReturn( batch );
+ final List<MvccEntity> mel = new ArrayList<MvccEntity>();
- //start the task
- ListenableFuture<Void> future = taskExecutor.submit( cleanupTask );
+ Entity entity = new Entity( entityId );
- //wait for the task
- future.get();
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.of(entity)) );
- //we deleted the version
- //verify we deleted everything
- verify( entityBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.of(entity)) );
- when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
- verify( logBatch, times( sizeToReturn ) ).mergeShallow( any( MutationBatch.class ) );
++ when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+ .thenReturn(mel.iterator() );
+ try {
+ cleanupTask.call();
+ }catch(Exception e){
+ Assert.fail( e.getMessage() );
+ }
+ //we deleted the version
+ //verify we deleted everything
+ verify( entityBatch, times( 1 ) ).mergeShallow( any( MutationBatch.class ) );
+ verify( logBatch, times( 1 ) ).mergeShallow( any( MutationBatch.class ) );
verify( logBatch ).execute();
@@@ -729,29 -608,41 +730,29 @@@
.thenReturn( batch );
- //start the task
- ListenableFuture<Void> future1 = taskExecutor.submit( firstTask );
-
- //now start another task while the slow running task is running
- ListenableFuture<Void> future2 = taskExecutor.submit( secondTask );
+ final List<MvccEntity> mel = new ArrayList<MvccEntity>();
- //get the second task, we shouldn't have been able to queue it, therefore it should just run in process
- future2.get();
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- /**
- * While we're not done, release latches every 200 ms
- */
- while ( !future1.isDone() ) {
- Thread.sleep( 200 );
- waitSemaphore.release( listenerCount );
- }
+ mel.add( new MvccEntityImpl( entityId, UUIDGenerator.newTimeUUID(),
+ MvccEntity.Status.DELETED, Optional.fromNullable((Entity)null)) );
- when( ess.load( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
- //wait for the task
- future1.get();
++ when( ess.loadDescendingHistory( same( appScope ), same( entityId ), any(UUID.class), any(Integer.class) ) )
+ .thenReturn(mel.iterator() );
- //we deleted the version
- //verify we deleted everything
+ try {
+ cleanupTask.rejected();
+ }catch(Exception e){
+ Assert.fail(e.getMessage());
+ }
//we deleted the version
- //verify we deleted everything
- verify( logBatch, times( sizeToReturn* 2 ) ).mergeShallow( any( MutationBatch.class ) );
-
- verify( entityBatch, times( sizeToReturn * 2) ).mergeShallow( any( MutationBatch.class ) );
-
-
- verify( logBatch, times(2) ).execute();
-
- verify( entityBatch, times(2) ).execute();
+ //verify it was run
+ verify( entityBatch ).execute();
+ verify( logBatch ).execute();
//the latch was executed
latch.await();