You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/10 19:09:01 UTC
[4/5] incubator-usergrid git commit: added new entity object for
serialization
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index af8bbed..2509771 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -24,6 +24,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -32,7 +37,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
@@ -40,9 +45,7 @@ import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.CurrentImpl;
-import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
@@ -66,8 +69,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
private Injector injector;
- private EntityDataMigration entityDataMigration;
- private ManagerCache managerCache;
+ private DataMigration entityDataMigration;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
private MvccEntitySerializationStrategy v1Strategy;
@@ -80,19 +82,21 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
* Rule to do the resets we need
*/
@Rule
- public MigrationTestRule migrationTestRule =
- new MigrationTestRule( app, CpSetup.getInjector() ,EntityDataMigration.class );
+ public MigrationTestRule migrationTestRule =
+ new MigrationTestRule( app, CpSetup.getInjector() ,MvccEntitySerializationStrategyProxyV2Impl.class );
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Before
public void setup() {
emf = setup.getEmf();
injector = CpSetup.getInjector();
- entityDataMigration = injector.getInstance( EntityDataMigration.class );
- managerCache = injector.getInstance( ManagerCache.class );
+ entityDataMigration = injector.getInstance( MvccEntitySerializationStrategyProxyV2Impl.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
- v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
- v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+ MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
+ allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
+ v1Strategy = strategy.getMigration().from();
+ v2Strategy = strategy.getMigration().to();
}
@@ -131,11 +135,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types. Assumes we're
//using a test system, and it's not a huge amount of data, otherwise we'll overflow.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity ) {
//add all versions from history to our comparison
for ( final Id id : entity.entityIds ) {
@@ -167,7 +171,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
assertTrue( "Saved new entities", savedEntities.size() > 0 );
//perform the migration
- entityDataMigration.migrate( progressObserver );
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext(new Action1<ApplicationEntityGroup>() {
+ @Override
+ public void call(ApplicationEntityGroup applicationEntityGroup) {
+ try {
+ entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
+ }catch (Throwable e){
+ throw new RuntimeException(e);
+ }
+ }
+ }).toBlocking().last();
+
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -183,12 +198,11 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//now visit all entities in the system again, load them from v2, and ensure they're the same
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final AllEntitiesInSystemObservable
- .ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity ) {
//add all versions from history to our comparison
for ( final Id id : entity.entityIds ) {
@@ -218,16 +232,15 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
assertEquals( "All entities migrated", 0, savedEntities.size() );
- //now visit all entities in the system again, and load them from the EM,
+ //now visit all entities in the system again, and load them from the EM,
// ensure we see everything we did in the v1 traversal
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final AllEntitiesInSystemObservable
- .ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity ) {
- final EntityManager em = emf.getEntityManager(
+ final EntityManager em = emf.getEntityManager(
entity.applicationScope.getApplication().getUuid() );
//add all versions from history to our comparison
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index 266fb17..3da0b85 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -23,6 +23,8 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -31,7 +33,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
@@ -69,9 +71,9 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
* Rule to do the resets we need
*/
@Rule
- public MigrationTestRule migrationTestRule = new MigrationTestRule(
+ public MigrationTestRule migrationTestRule = new MigrationTestRule(
app, CpSetup.getInjector() ,EntityTypeMappingMigration.class );
-
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Before
@@ -82,6 +84,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
keyspace = injector.getInstance( Keyspace.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+ allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
}
@@ -114,62 +117,69 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
keyspace.truncateColumnFamily( MapSerializationImpl.MAP_ENTRIES );
keyspace.truncateColumnFamily( MapSerializationImpl.MAP_KEYS );
- app.createApplication(
- GraphShardVersionMigrationIT.class.getSimpleName()+ UUIDGenerator.newTimeUUID(),
+ app.createApplication(
+ GraphShardVersionMigrationIT.class.getSimpleName()+ UUIDGenerator.newTimeUUID(),
"migrationTest" );
final TestProgressObserver progressObserver = new TestProgressObserver();
- entityTypeMappingMigration.migrate( progressObserver );
-
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext(new Action1<ApplicationEntityGroup>() {
+ @Override
+ public void call(final ApplicationEntityGroup entity) {
+ try {
+ entityTypeMappingMigration.migrate(entity, progressObserver).toBlocking().last();
+ }catch (Throwable e ){
+ throw new RuntimeException(e);
+ }
+ }
+ });
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+ .doOnNext(new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity) {
//ensure that each one has a type
final EntityManager em = emf.getEntityManager(
- entity.applicationScope.getApplication().getUuid() );
+ entity.applicationScope.getApplication().getUuid());
- for ( final Id id : entity.entityIds ) {
+ for (final Id id : entity.entityIds) {
try {
- final Entity returned = em.get( id.getUuid() );
+ final Entity returned = em.get(id.getUuid());
//we seem to occasionally get phantom edges. If this is the
// case we'll store the type _> uuid mapping, but we won't have
// anything to load
- if ( returned != null ) {
- assertEquals( id.getUuid(), returned.getUuid() );
- assertEquals( id.getType(), returned.getType() );
- }
- else {
- final String type = managerCache.getMapManager( CpNamingUtils
- .getEntityTypeMapScope(
- entity.applicationScope.getApplication() ) )
- .getString( id.getUuid()
- .toString() );
-
- assertEquals( id.getType(), type );
+ if (returned != null) {
+ assertEquals(id.getUuid(), returned.getUuid());
+ assertEquals(id.getType(), returned.getType());
+ } else {
+ final String type = managerCache.getMapManager(CpNamingUtils
+ .getEntityTypeMapScope(
+ entity.applicationScope.getApplication()))
+ .getString(id.getUuid()
+ .toString());
+
+ assertEquals(id.getType(), type);
}
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Unable to get entity " + id
- + " by UUID, migration failed", e );
- }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to get entity " + id
+ + " by UUID, migration failed", e);
+ }
- allEntities.remove( id );
- }
- }
- } ).toBlocking().lastOrDefault( null );
+ allEntities.remove(id);
+ }
+ }
+ }).toBlocking().lastOrDefault(null);
- assertEquals( "Every element should have been encountered", 0, allEntities.size() );
- assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
- assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
- }
-}
+ assertEquals("Every element should have been encountered", 0, allEntities.size());
+ assertFalse("Progress observer should not have failed", progressObserver.getFailed());
+ assertTrue("Progress observer should have update messages", progressObserver.getUpdates().size() > 0);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index f287047..2d6d0d9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -23,6 +23,10 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -31,7 +35,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
@@ -55,7 +59,7 @@ import org.junit.Ignore;
public class GraphShardVersionMigrationIT extends AbstractCoreIT {
private Injector injector;
- private GraphShardVersionMigration graphShardVersionMigration;
+ private DataMigration graphShardVersionMigration;
private ManagerCache managerCache;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,18 +69,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
* Rule to do the resets we need
*/
@Rule
- public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,GraphShardVersionMigration.class );
-
+ public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,EdgeMetadataSerializationProxyImpl.class );
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Before
public void setup() {
injector = CpSetup.getInjector();
- graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
+ graphShardVersionMigration = injector.getInstance( EdgeMetadataSerializationProxyImpl.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-
+ allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
}
@@ -114,11 +118,11 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
@@ -154,7 +158,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//perform the migration
- graphShardVersionMigration.migrate( progressObserver );
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final ApplicationEntityGroup entity) {
+ try {
+ graphShardVersionMigration.migrate(entity, progressObserver).toBlocking().last();
+ }catch (Throwable e){
+ throw new RuntimeException(e);
+ }
+ }
+ }).toBlocking().last();
assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
@@ -171,12 +186,11 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final AllEntitiesInSystemObservable
- .ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index 4d1c6c9..30c9ac3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -23,6 +23,11 @@ package org.apache.usergrid.corepersistence.rx;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +60,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+ TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
final EntityManager em = app.getEntityManager();
final String type1 = "type1thing";
@@ -92,9 +100,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+ public void call( final ApplicationEntityGroup entity ) {
assertNotNull(entity);
assertNotNull(entity.applicationScope);
@@ -125,7 +133,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
//test connections
- TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+ targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id target ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index f8f3c50..7c902ea 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,21 +24,19 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
import rx.functions.Action1;
-import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
@@ -52,6 +50,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
final Application createdApplication = app.getEntityManager().getApplication();
+ ApplicationObservable applicationObservable = CpSetup.getInjector().getInstance(ApplicationObservable.class);
//now our get all apps we expect. There may be more, but we don't care about those.
final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -66,7 +65,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
//clean up our wiring
ManagerCache managerCache = CpSetup.getInjector().getInstance( ManagerCache.class );
- Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds( managerCache );
+ Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
appObservable.doOnNext( new Action1<Id>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 2aa7fbc..2564fe5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -60,6 +61,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ EdgesToTargetObservable edgesToTargetObservable = CpSetup.getInjector().getInstance(EdgesToTargetObservable.class);
final EntityManager em = app.getEntityManager();
final Application createdApplication = em.getApplication();
@@ -96,7 +98,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- EdgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+ edgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index ef0d953..5d25f62 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ EdgesFromSourceObservable edgesFromSourceObservable= CpSetup.getInjector().getInstance(EdgesFromSourceObservable.class);
final EntityManager em = app.getEntityManager();
final String type1 = "type1things";
@@ -92,7 +94,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- EdgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
+ edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
@@ -124,7 +126,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
//test connections
- EdgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
+ edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
index cde8866..e5b0319 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,8 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
final EntityManager em = app.getEntityManager();
@@ -93,7 +96,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- TargetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
+ targetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id target ) {
@@ -116,7 +119,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
//test connections
- TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+ targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id target ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 4de18fe..90cade0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection;
import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -30,7 +32,7 @@ import rx.Observable;
/**
* The operations for performing changes on an entity
*/
-public interface EntityCollectionManager {
+public interface EntityCollectionManager extends CPManager {
/**
* Write the entity in the entity collection. This is an entire entity, it's contents will
@@ -68,12 +70,12 @@ public interface EntityCollectionManager {
/**
* Takes the change and reloads an entity with all changes applied in this entity applied.
- * The resulting entity from calling load will be the previous version of this entity plus
+ * The resulting entity from calling load will be the previous version of this entity plus
* the entity in this object applied to it.
*/
public Observable<Entity> update ( Entity entity );
- /**
+ /**
* Returns health of entity data store.
*/
public Health getHealth();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
index ef579f8..9140913 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
@@ -20,40 +20,42 @@ package org.apache.usergrid.persistence.collection;
/**
- * A basic factory that creates a collection manager with the given context.
+ * A basic factory that creates a collection manager with the given context.
* Each instance of this factory should exist for a Single ApplicationScope
*/
public interface EntityCollectionManagerFactory {
/**
- * Create a new EntityCollectionManager for the given context.
- * The EntityCollectionManager can safely be used on the current thread
+ * Create a new EntityCollectionManager for the given context.
+ * The EntityCollectionManager can safely be used on the current thread
* and will shard responses. The returned instance should not be shared
* among threads it will not be guaranteed to be thread safe.
*
- * @param collectionScope The collectionScope collectionScope to use
+ * @param collectionScope The collectionScope collectionScope to use
* when creating the collectionScope manager
*
* @return The collectionScope manager to perform operations within the provided context
*/
- public EntityCollectionManager
+ public EntityCollectionManager
createCollectionManager( CollectionScope collectionScope );
/**
- * Create a new EntityCollectionManagerSync for the given context.
- * The EntityCollectionManager can safely be used on the current thread
+ * Create a new EntityCollectionManagerSync for the given context.
+ * The EntityCollectionManager can safely be used on the current thread
* and will shard responses. The returned instance should not be shared
- * among threads it will not be guaranteed to be thread safe.
+ * among threads it will not be guaranteed to be thread safe.
* This implementation will be synchronous. Try to use the org.apache.usergrid.persistence.core.consistency
* implementation if possible
*
- * @param collectionScope The collectionScope collectionScope to use when
+ * @param collectionScope The collectionScope collectionScope to use when
* creating the collectionScope manager
*
* @return The collectionScope manager to perform operations within the provided context
*/
- public EntityCollectionManagerSync
+ public EntityCollectionManagerSync
createCollectionManagerSync( CollectionScope collectionScope );
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 1c3e258..c5dd961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.collection.guice;
+import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -81,10 +82,7 @@ public class CollectionModule extends AbstractModule {
Multibinder.newSetBinder( binder(), EntityDeleted.class );
// create a guice factor for getting our collection manager
- install( new FactoryModuleBuilder()
- .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
- .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
- .build( EntityCollectionManagerFactory.class ) );
+ bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
@@ -116,7 +114,7 @@ public class CollectionModule extends AbstractModule {
@Provides
@CollectionTaskExecutor
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks",
+ return new NamedTaskExecutorImpl( "collectiontasks",
serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
new file mode 100644
index 0000000..790be19
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import org.apache.usergrid.persistence.collection.*;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.guice.Write;
+import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.*;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class EntityCollectionManagerFactoryImpl implements EntityCollectionManagerFactory {
+
+
+ private final WriteStart writeStart;
+ private final WriteStart writeUpdate;
+ private final WriteUniqueVerify writeVerifyUnique;
+ private final WriteOptimisticVerify writeOptimisticVerify;
+ private final WriteCommit writeCommit;
+ private final RollbackAction rollback;
+ private final MarkStart markStart;
+ private final MarkCommit markCommit;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+ private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+ private final Keyspace keyspace;
+ private final SerializationFig config;
+ private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+ private final EntityVersionCreatedFactory entityVersionCreatedFactory;
+ private final EntityDeletedFactory entityDeletedFactory;
+ private final TaskExecutor taskExecutor;
+ private final CollectionScope collectionScope;
+ private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 )
+ .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
+ public EntityCollectionManager load( CollectionScope scope ) {
+ return new EntityCollectionManagerImpl(
+ writeStart,
+ writeUpdate,
+ writeVerifyUnique,
+ writeOptimisticVerify,writeCommit,rollback,markStart,markCommit,entitySerializationStrategy,uniqueValueSerializationStrategy,mvccLogEntrySerializationStrategy,keyspace,config,entityVersionCleanupFactory,entityVersionCreatedFactory,entityDeletedFactory,taskExecutor,collectionScope);
+ }
+ } );
+
+
+
+ public EntityCollectionManagerFactoryImpl( @Write final WriteStart writeStart,
+ @WriteUpdate final WriteStart writeUpdate,
+ final WriteUniqueVerify writeVerifyUnique,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit,
+ final RollbackAction rollback,
+ final MarkStart markStart,
+ final MarkCommit markCommit,
+ @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+ final Keyspace keyspace,
+ final SerializationFig config,
+ final EntityVersionCleanupFactory entityVersionCleanupFactory,
+ final EntityVersionCreatedFactory entityVersionCreatedFactory,
+ final EntityDeletedFactory entityDeletedFactory,
+ @CollectionTaskExecutor final TaskExecutor taskExecutor,
+ @Assisted final CollectionScope collectionScope){
+
+ this.writeStart = writeStart;
+ this.writeUpdate = writeUpdate;
+ this.writeVerifyUnique = writeVerifyUnique;
+ this.writeOptimisticVerify = writeOptimisticVerify;
+ this.writeCommit = writeCommit;
+ this.rollback = rollback;
+ this.markStart = markStart;
+ this.markCommit = markCommit;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+ this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+ this.keyspace = keyspace;
+ this.config = config;
+ this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+ this.entityVersionCreatedFactory = entityVersionCreatedFactory;
+ this.entityDeletedFactory = entityDeletedFactory;
+ this.taskExecutor = taskExecutor;
+ this.collectionScope = collectionScope;
+ }
+ @Override
+ public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
+ try{
+ return ecmCache.get(collectionScope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public EntityCollectionManagerSync createCollectionManagerSync(CollectionScope collectionScope) {
+ return new EntityCollectionManagerSyncImpl(this,collectionScope);
+ }
+
+ @Override
+ public void invalidate() {
+ ecmCache.invalidateAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index a89924a..11c5d44 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -33,7 +33,7 @@ import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.collection.guice.Write;
import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@ -117,19 +117,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Inject
- public EntityCollectionManagerImpl(
- @Write final WriteStart writeStart,
+ public EntityCollectionManagerImpl(
+ @Write final WriteStart writeStart,
@WriteUpdate final WriteStart writeUpdate,
final WriteUniqueVerify writeVerifyUnique,
final WriteOptimisticVerify writeOptimisticVerify,
- final WriteCommit writeCommit,
+ final WriteCommit writeCommit,
final RollbackAction rollback,
- final MarkStart markStart,
+ final MarkStart markStart,
final MarkCommit markCommit,
@ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace,
+ final Keyspace keyspace,
final SerializationFig config,
final EntityVersionCleanupFactory entityVersionCleanupFactory,
final EntityVersionCreatedFactory entityVersionCreatedFactory,
@@ -182,9 +182,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
- // execute all validation stages concurrently. Needs refactored when this is done.
+ // execute all validation stages concurrently. Needs refactored when this is done.
// https://github.com/Netflix/RxJava/issues/627
- // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
+ // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
// writeVerifyUnique, writeOptimisticVerify );
return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
@@ -402,4 +402,4 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
return Health.RED;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 9ff4f56..f7d5b58 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -24,7 +24,7 @@ import com.netflix.astyanax.MutationBatch;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.core.task.Task;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -56,13 +56,13 @@ public class EntityDeletedTask implements Task<Void> {
@Inject
- public EntityDeletedTask(
+ public EntityDeletedTask(
EntityVersionCleanupFactory entityVersionCleanupFactory,
final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
@ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject
- @Assisted final CollectionScope collectionScope,
- @Assisted final Id entityId,
+ @Assisted final CollectionScope collectionScope,
+ @Assisted final Id entityId,
@Assisted final UUID version) {
this.entityVersionCleanupFactory = entityVersionCleanupFactory;
@@ -81,7 +81,7 @@ public class EntityDeletedTask implements Task<Void> {
new Object[] { collectionScope, entityId, version }, throwable );
}
-
+
@Override
public Void rejected() {
try {
@@ -94,9 +94,9 @@ public class EntityDeletedTask implements Task<Void> {
return null;
}
-
+
@Override
- public Void call() throws Exception {
+ public Void call() throws Exception {
entityVersionCleanupFactory.getTask( collectionScope, entityId, version ).call();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index efecdeb..2f51eb5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
@@ -55,7 +55,7 @@ import rx.schedulers.Schedulers;
/**
- * Cleans up previous versions from the specified version. Note that this means the version
+ * Cleans up previous versions from the specified version. Note that this means the version
* passed in the io event is retained, the range is exclusive.
*/
public class EntityVersionCleanupTask implements Task<Void> {
@@ -77,7 +77,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
@Inject
- public EntityVersionCleanupTask(
+ public EntityVersionCleanupTask(
final SerializationFig serializationFig,
final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
@ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
@@ -159,7 +159,7 @@ public class EntityVersionCleanupTask implements Task<Void> {
continue;
}
final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
- final MutationBatch deleteMutation =
+ final MutationBatch deleteMutation =
uniqueValueSerializationStrategy.delete(scope,unique);
batch.mergeShallow(deleteMutation);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
new file mode 100644
index 0000000..0a3cd3a
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.mvcc;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+
+/**
+ * Classy class class.
+ */
+public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
deleted file mode 100644
index 8a13115..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntitySerializationStrategy.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.mvcc;
-
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.MutationBatch;
-
-
-/**
- * The interface that allows us to serialize an entity to disk
- */
-public interface MvccEntitySerializationStrategy extends Migration {
-
- /**
- * Serialize the entity to the data store with the given collection context
- *
- * @param entity The entity to persist
- *
- * @return The MutationBatch operations for this update
- */
- public MutationBatch write( CollectionScope context, MvccEntity entity );
-
-
-
- /**
- * Load the entities into the entitySet from the specified Ids. Loads versions <= the maxVersion
- * @param scope
- * @param entityIds
- * @return
- */
- public EntitySet load( CollectionScope scope, Collection<Id> entityIds, UUID maxVersion);
-
- /**
- * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
- *
- * @param context The context to persist the entity into
- * @param entityId The entity id to load
- * @param version The max version to seek from. I.E a stored version <= this argument
- * @param fetchSize The fetch size to return for each trip to cassandra.
- *
- * @return An iterator of entities ordered from max(UUID)=> min(UUID). The return value should be null
- * safe and return an empty list when there are no matches
- */
- public Iterator<MvccEntity> loadDescendingHistory( CollectionScope context, Id entityId, UUID version,
- int fetchSize );
-
- /**
- * Load a historical list of entities, from lowest to highest entity with versions < version up to maxSize elements
- *
- * @param context The context to persist the entity into
- * @param entityId The entity id to load
- * @param version The max version to seek to. I.E a stored version < this argument
- * @param fetchSize The fetch size to return for each trip to cassandra.
- * @return An iterator of entities ordered from min(UUID)=> max(UUID). The return value should be null
- * safe and return an empty list when there are no matches
- */
- public Iterator<MvccEntity> loadAscendingHistory( CollectionScope context, Id entityId, UUID version,
- int fetchSize );
-
- /**
- * Mark this this version as deleted from the persistence store, but keep the version to mark that is has been cleared This
- * can be used in a mark+sweep system. The entity with the given version will exist in the context, but no data
- * will be stored
- */
- public MutationBatch mark( CollectionScope context, Id entityId, UUID version );
-
-
- /**
- * Delete the entity from the context with the given entityId and version
- *
- * @param context The context that contains the entity
- * @param entityId The entity id to delete
- * @param version The version to delete
- */
- public MutationBatch delete( CollectionScope context, Id entityId, UUID version );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
index baf2ac3..380bf15 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommit.java
@@ -25,7 +25,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index d3c8193..8604af6 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.exception.WriteCommitException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
@@ -53,7 +53,7 @@ import rx.functions.Func1;
/**
- * This phase should invoke any finalization, and mark the entity as committed in the
+ * This phase should invoke any finalization, and mark the entity as committed in the
* data store before returning
*/
@Singleton
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
index 4226fe6..1c7909b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/EntityRepairImpl.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.List;
import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLog;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
new file mode 100644
index 0000000..bf1422b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategy.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.schema.Migration;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * The interface that allows us to serialize an entity to disk
+ */
+public interface MvccEntitySerializationStrategy extends Migration {
+
+ /**
+ * Serialize the entity to the data store with the given collection context
+ *
+ * @param entity The entity to persist
+ * @return The MutationBatch operations for this update
+ */
+ public MutationBatch write(CollectionScope context, MvccEntity entity);
+
+
+ /**
+ * Load the entities into the entitySet from the specified Ids. Loads versions <= the maxVersion
+ *
+ * @param scope
+ * @param entityIds
+ * @return
+ */
+ public EntitySet load(CollectionScope scope, Collection<Id> entityIds, UUID maxVersion);
+
+ /**
+ * Load a list, from highest to lowest of the entity with versions <= version up to maxSize elements
+ *
+ * @param context The context to persist the entity into
+ * @param entityId The entity id to load
+ * @param version The max version to seek from. I.E a stored version <= this argument
+ * @param fetchSize The fetch size to return for each trip to cassandra.
+ * @return An iterator of entities ordered from max(UUID)=> min(UUID). The return value should be null
+ * safe and return an empty list when there are no matches
+ */
+ public Iterator<MvccEntity> loadDescendingHistory(CollectionScope context, Id entityId, UUID version,
+ int fetchSize);
+
+ /**
+ * Load a historical list of entities, from lowest to highest entity with versions < version up to maxSize elements
+ *
+ * @param context The context to persist the entity into
+ * @param entityId The entity id to load
+ * @param version The max version to seek to. I.E a stored version < this argument
+ * @param fetchSize The fetch size to return for each trip to cassandra.
+ * @return An iterator of entities ordered from min(UUID)=> max(UUID). The return value should be null
+ * safe and return an empty list when there are no matches
+ */
+ public Iterator<MvccEntity> loadAscendingHistory(CollectionScope context, Id entityId, UUID version,
+ int fetchSize);
+
+ /**
+ * Mark this this version as deleted from the persistence store, but keep the version to mark that is has been cleared This
+ * can be used in a mark+sweep system. The entity with the given version will exist in the context, but no data
+ * will be stored
+ */
+ public MutationBatch mark(CollectionScope context, Id entityId, UUID version);
+
+
+ /**
+ * Delete the entity from the context with the given entityId and version
+ *
+ * @param context The context that contains the entity
+ * @param entityId The entity id to delete
+ * @param version The version to delete
+ */
+ public MutationBatch delete(CollectionScope context, Id entityId, UUID version);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
index 6badbc1..3d36438 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
import org.apache.usergrid.persistence.collection.exception.DataCorruptionException;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
import org.apache.usergrid.persistence.collection.serialization.EntityRepair;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b604e6d7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
new file mode 100644
index 0000000..bb192cd
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import rx.Observable;
+import rx.functions.Func1;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Version 4 implementation of entity serialization. This will proxy writes and reads so that during
+ * migration data goes to both sources and is read from the old source. After the ugprade completes,
+ * it will be available from the new source
+ */
+public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
+
+
+ private final DataMigrationManager dataMigrationManager;
+ protected final Keyspace keyspace;
+ protected final MvccEntitySerializationStrategy previous;
+ protected final MvccEntitySerializationStrategy current;
+
+
+ @Inject
+ public MvccEntitySerializationStrategyProxy( final DataMigrationManager dataMigrationManager,
+ final Keyspace keyspace,
+ final MvccEntitySerializationStrategy previous,
+ final MvccEntitySerializationStrategy current) {
+ this.dataMigrationManager = dataMigrationManager;
+ this.keyspace = keyspace;
+ this.previous = previous;
+ this.current = current;
+ }
+
+
+ @Override
+ public MutationBatch write( final CollectionScope context, final MvccEntity entity ) {
+ if ( isOldVersion() ) {
+ final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+ aggregateBatch.mergeShallow( previous.write( context, entity ) );
+ aggregateBatch.mergeShallow( current.write( context, entity ) );
+
+ return aggregateBatch;
+ }
+
+ return current.write( context, entity );
+ }
+
+
+ @Override
+ public EntitySet load( final CollectionScope scope, final Collection<Id> entityIds, final UUID maxVersion ) {
+ if ( isOldVersion() ) {
+ return previous.load( scope, entityIds, maxVersion );
+ }
+
+ return current.load( scope, entityIds, maxVersion );
+ }
+
+
+ @Override
+ public Iterator<MvccEntity> loadDescendingHistory( final CollectionScope context, final Id entityId,
+ final UUID version, final int fetchSize ) {
+ if ( isOldVersion() ) {
+ return previous.loadDescendingHistory( context, entityId, version, fetchSize );
+ }
+
+ return current.loadDescendingHistory( context, entityId, version, fetchSize );
+ }
+
+
+ @Override
+ public Iterator<MvccEntity> loadAscendingHistory( final CollectionScope context, final Id entityId,
+ final UUID version, final int fetchSize ) {
+ if ( isOldVersion() ) {
+ return previous.loadAscendingHistory( context, entityId, version, fetchSize );
+ }
+
+ return current.loadAscendingHistory( context, entityId, version, fetchSize );
+ }
+
+
+ @Override
+ public MutationBatch mark( final CollectionScope context, final Id entityId, final UUID version ) {
+ if ( isOldVersion() ) {
+ final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+ aggregateBatch.mergeShallow( previous.mark( context, entityId, version ) );
+ aggregateBatch.mergeShallow( current.mark( context, entityId, version ) );
+
+ return aggregateBatch;
+ }
+
+ return current.mark( context, entityId, version );
+ }
+
+
+ @Override
+ public MutationBatch delete( final CollectionScope context, final Id entityId, final UUID version ) {
+ if ( isOldVersion() ) {
+ final MutationBatch aggregateBatch = keyspace.prepareMutationBatch();
+
+ aggregateBatch.mergeShallow( previous.delete( context, entityId, version ) );
+ aggregateBatch.mergeShallow( current.delete( context, entityId, version ) );
+
+ return aggregateBatch;
+ }
+
+ return current.delete( context, entityId, version );
+ }
+
+ /**
+ * Return true if we're on an old version
+ */
+ private boolean isOldVersion() {
+ return dataMigrationManager.getCurrentVersion() < getVersion();
+ }
+
+
+ @Override
+ public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
+ final AtomicLong atomicLong = new AtomicLong();
+ final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+ final List<Id> entityIds = applicationEntityGroup.entityIds;
+
+ final UUID now = UUIDGenerator.newTimeUUID();
+
+ //go through each entity in the system, and load it's entire
+ // history
+ return Observable.from(entityIds)
+
+ .map(new Func1<Id, Id>() {
+ @Override
+ public Id call(Id entityId) {
+
+ ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
+
+ if (!(applicationScope instanceof CollectionScope)) {
+ throw new IllegalArgumentException("getCollectionScopeFromEntityId must return a collection scope");
+ }
+
+ CollectionScope currentScope = (CollectionScope) applicationScope;
+ MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = getMigration();
+ //for each element in the history in the previous version,
+ // copy it to the CF in v2
+ Iterator<MvccEntity> allVersions = migration.from()
+ .loadDescendingHistory(currentScope, entityId, now,
+ 1000);
+
+ while (allVersions.hasNext()) {
+ final MvccEntity version = allVersions.next();
+
+ final MutationBatch versionBatch =
+ migration.to().write(currentScope, version);
+
+ totalBatch.mergeShallow(versionBatch);
+
+ if (atomicLong.incrementAndGet() % 50 == 0) {
+ executeBatch(totalBatch, observer, atomicLong);
+ }
+ }
+ executeBatch(totalBatch, observer, atomicLong);
+ return entityId;
+ }
+ })
+ .map(new Func1<Id, Long>() {
+ @Override
+ public Long call(Id id) {
+ executeBatch(totalBatch, observer, atomicLong);
+ return atomicLong.get();
+ }
+ });
+ }
+
+ protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po, final AtomicLong count ) {
+ try {
+ batch.execute();
+
+ po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+ }
+ catch ( ConnectionException e ) {
+ po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+ throw new DataMigrationException( "Unable to migrate batches ", e );
+ }
+ }
+
+}
+