You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/19 23:20:06 UTC
[16/50] [abbrv] incubator-usergrid git commit: Pushed migration tests
down to core tiers. WIP
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
deleted file mode 100644
index f390723..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.usergrid.corepersistence.CpSetup;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.core.migration.data.*;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeDataMigrationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.EntityWriteHelper;
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.inject.Injector;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import rx.functions.Action1;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
-@NotThreadSafe
-public class GraphShardVersionMigrationIT extends AbstractCoreIT {
-
- private Injector injector;
- private ApplicationDataMigration graphShardVersionMigration;
- private ManagerCache managerCache;
- private DataMigrationManager dataMigrationManager;
- private MigrationInfoSerialization migrationInfoSerialization;
-
-
- /**
- * Rule to do the resets we need
- */
- @Rule
- public MigrationTestRule migrationTestRule = new MigrationTestRule( app, SpringResource.getInstance().getBean(Injector.class) ,EdgeDataMigrationImpl.class );
- private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
- private ApplicationObservable applicationObservable;
-
-
- @Before
- public void setup() {
-
- injector = SpringResource.getInstance().getBean( Injector.class );
- graphShardVersionMigration = injector.getInstance( EdgeDataMigrationImpl.class );
- managerCache = injector.getInstance( ManagerCache.class );
- dataMigrationManager = injector.getInstance( DataMigrationManager.class );
- migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
- allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
- applicationObservable = injector.getInstance(ApplicationObservable.class);
-
- }
-
-
- @Test
- @Ignore("Ignored awaiting fix for USERGRID-268")
- public void testIdMapping() throws Throwable {
-
- assertEquals( "version 2 expected", 2, graphShardVersionMigration.getVersion() );
- assertEquals( "Previous version expected", 1, dataMigrationManager.getCurrentVersion());
-
-
-
- final EntityManager newAppEm = app.getEntityManager();
-
- final String type1 = "type1thing";
- final String type2 = "type2thing";
- final int size = 10;
-
- final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
- final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
-
-
- final Set<Id> allEntities = new HashSet<>();
- allEntities.addAll( type1Identities );
- allEntities.addAll( type2Identities );
-
-
- final TestProgressObserver progressObserver = new TestProgressObserver();
-
-
- //used to validate 1.0 types, and 2.0 types
- final Multimap<Id, String> sourceTypes = HashMultimap.create( 10000, 10 );
- final Multimap<Id, String> targetTypes = HashMultimap.create( 10000, 10 );
-
-
- //read everything in previous version format and put it into our types.
-
- allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
- .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public void call(
- final ApplicationEntityGroup<CollectionScope> entity ) {
-
- final GraphManager gm =
- managerCache.getGraphManager( entity.applicationScope );
-
- for ( final EntityIdScope<CollectionScope> idScope : entity.entityIds ) {
- /**
- * Get our edge types from the source
- */
- gm.getEdgeTypesFromSource( new SimpleSearchEdgeType(idScope.getId(), null, null ) )
- .doOnNext(new Action1<String>() {
- @Override
- public void call(final String s) {
- sourceTypes.put(idScope.getId(), s);
- }
- }).toBlocking().lastOrDefault( null );
-
-
- /**
- * Get the edge types to the target
- */
- gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( idScope.getId(), null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- targetTypes.put( idScope.getId(), s );
- }
- } ).toBlocking().lastOrDefault( null );
-
- allEntities.remove( idScope.getId() );
- }
- }
- } ).toBlocking().lastOrDefault( null );
-
-
- //perform the migration
-
- graphShardVersionMigration.migrate(applicationObservable.getAllApplicationScopes(), progressObserver).toBlocking().last();
-
- assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
- assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
- assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
- //write the status and version, then invalidate the cache so it appears
- migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
- migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
- dataMigrationManager.invalidate();
-
- assertEquals( "New version saved, and we should get new implementation",
- graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion() );
-
-
- //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
- allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
- .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public void call(
- final ApplicationEntityGroup<CollectionScope> entity ) {
-
- final GraphManager gm =
- managerCache.getGraphManager( entity.applicationScope );
-
- for ( final EntityIdScope<CollectionScope> idScope : entity.entityIds ) {
- /**
- * Get our edge types from the source
- */
- gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( idScope.getId(), null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- sourceTypes.remove( idScope.getId(), s );
- }
- } ).toBlocking().lastOrDefault( null );
-
-
- /**
- * Get the edge types to the target
- */
- gm.getEdgeTypesToTarget(
- new SimpleSearchEdgeType( idScope.getId(), null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- targetTypes.remove( idScope.getId(), s );
- }
- } ).toBlocking().lastOrDefault( null );
- }
- }
- }
-
-
- ).toBlocking().lastOrDefault( null );
-
-
- assertEquals( "All source types migrated", 0, sourceTypes.size() );
-
-
- assertEquals( "All target types migrated", 0, targetTypes.size() );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
index 3071b72..67f89ab 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/MigrationTestRule.java
@@ -25,8 +25,9 @@ import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.apache.usergrid.CoreApplication;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Injector;
@@ -45,7 +46,7 @@ public class MigrationTestRule extends ExternalResource {
protected final CoreApplication core;
protected final DataMigrationManager dataMigrationManager;
- protected final DataMigration dataMigration;
+ protected final DataMigration2 dataMigration;
protected int currentVersion;
@@ -55,10 +56,8 @@ public class MigrationTestRule extends ExternalResource {
*
* @param core the CoreApplication rule used in this test
* @param injector The injector used in this test
- * @param dataMigrationClass The data migration class that is under test
*/
- public MigrationTestRule( final CoreApplication core, final Injector injector,
- final Class<? extends DataMigration> dataMigrationClass ) {
+ public MigrationTestRule( final CoreApplication core, final Injector injector ) {
this.core = core;
this.dataMigrationManager = injector.getInstance( DataMigrationManager.class );
this.dataMigration = injector.getInstance( dataMigrationClass );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
deleted file mode 100644
index c7b69a1..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-
-
-public class TestProgressObserver implements DataMigration.ProgressObserver {
-
- private boolean failed = false;
-
-
- private List<String> updates = new ArrayList<>( 100 );
-
-
- @Override
- public void failed( final int migrationVersion, final String reason ) {
- failed = true;
- }
-
-
- @Override
- public void failed( final int migrationVersion, final String reason, final Throwable throwable ) {
- failed = true;
- }
-
-
- @Override
- public void update( final int migrationVersion, final String message ) {
- updates.add( message );
- }
-
-
- /**
- * Get if we failed
- * @return
- */
- public boolean getFailed() {
- return failed;
- }
-
-
- /**
- * Get update messages
- * @return
- */
- public List<String> getUpdates() {
- return updates;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index ca14bd0..7a6ec22 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -23,26 +23,20 @@ package org.apache.usergrid.corepersistence.rx;
import java.util.HashSet;
import java.util.Set;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Injector;
@@ -59,12 +53,10 @@ import static org.junit.Assert.assertTrue;
*/
public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
- private final Logger logger = LoggerFactory.getLogger( AllEntitiesInSystemObservableIT.class );
-
@Test
public void testEntities() throws Exception {
Injector injector = SpringResource.getInstance().getBean(Injector.class);
- AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl =injector.getInstance(AllEntitiesInSystemObservable.class);
+ AllEntitiesInSystemImpl allEntitiesInSystemObservableImpl =injector.getInstance(AllEntitiesInSystemImpl.class);
TargetIdObservable targetIdObservable = injector.getInstance(TargetIdObservable.class);
final EntityManager em = app.getEntityManager();
@@ -99,34 +91,25 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
final ApplicationScope scope = CpNamingUtils.getApplicationScope( app.getId() );
- final Id applicationId = scope.getApplication();
final GraphManager gm = managerCache.getGraphManager( scope );
- allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
+ allEntitiesInSystemObservableImpl.getData().doOnNext( new Action1<EntityIdScope>() {
@Override
- public void call( final ApplicationEntityGroup<CollectionScope> entity ) {
-
- assertNotNull(entity);
- assertNotNull(entity.applicationScope);
- assertNotNull(entity.entityIds);
-
- //not from our test, don't check it
- if(!applicationId.equals( entity.applicationScope.getApplication() )){
- return;
- }
-
- for(EntityIdScope<CollectionScope> idScope : entity.entityIds) {
+ public void call( final EntityIdScope entityIdScope ) {
+ assertNotNull(entityIdScope);
+ assertNotNull(entityIdScope.getCollectionScope());
+ assertNotNull(entityIdScope.getId());
//we should only emit each node once
- if ( idScope.getId().getType().equals( type1 ) ) {
- assertTrue( "Element should be present on removal", type1Identities.remove(idScope.getId() ) );
+ if ( entityIdScope.getId().getType().equals( type1 ) ) {
+ assertTrue( "Element should be present on removal", type1Identities.remove(entityIdScope.getId() ) );
}
- else if ( idScope.getId().getType().equals( type2 ) ) {
- assertTrue( "Element should be present on removal", type2Identities.remove(idScope.getId() ) );
+ else if ( entityIdScope.getId().getType().equals( type2 ) ) {
+ assertTrue( "Element should be present on removal", type2Identities.remove(entityIdScope.getId() ) );
}
- }
+
}
} ).toBlocking().lastOrDefault( null );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index 081bbed..649f518 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,15 +24,15 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Injector;
@@ -52,7 +52,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
final Application createdApplication = app.getEntityManager().getApplication();
- ApplicationObservable applicationObservable =SpringResource.getInstance().getBean(Injector.class).getInstance(ApplicationObservable.class);
+ AllApplicationsObservable applicationObservable =SpringResource.getInstance().getBean(Injector.class).getInstance(AllApplicationsObservable.class);
//now our get all apps we expect. There may be more, but we don't care about those.
final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -67,13 +67,13 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
//clean up our wiring
ManagerCache managerCache = SpringResource.getInstance().getBean( Injector.class ).getInstance( ManagerCache.class );
- Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
+ Observable<ApplicationScope> appObservable = applicationObservable.getAllApplications();
- appObservable.doOnNext( new Action1<Id>() {
+ appObservable.doOnNext( new Action1<ApplicationScope>() {
@Override
- public void call( final Id id ) {
- applicationIds.remove( id.getUuid() );
- assertEquals("Correct application type expected" , Application.ENTITY_TYPE, id.getType() );
+ public void call( final ApplicationScope id ) {
+ applicationIds.remove( id.getApplication().getUuid() );
+ assertEquals("Correct application type expected" , Application.ENTITY_TYPE, id.getApplication().getType() );
}
} ).toBlocking().lastOrDefault( null );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
index e518298..6d32d73 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/MvccLogEntry.java
@@ -65,6 +65,7 @@ public interface MvccLogEntry {
/**
* The logentry being written represents a partial entity
*/
+ @Deprecated//removed in v3
PARTIAL(1),
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 7d78177..d5478d4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -89,28 +89,15 @@ public abstract class CollectionModule extends AbstractModule {
configureMigrationProvider();
}
-
- @Provides
- @Singleton
- @Inject
- @Write
- public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
- final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
-
- return writeStart;
- }
-
-
- //TODO USERGRID-405, remove this, it's no longer supported
- @Provides
- @Singleton
- @Inject
- @WriteUpdate
- public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
- final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
-
- return writeStart;
- }
+//
+// @Provides
+// @Singleton
+// @Inject
+// public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
+// final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
+//
+// return writeStart;
+// }
@Inject
@Singleton
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java
deleted file mode 100644
index ad752af..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/Write.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface Write {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java
deleted file mode 100644
index 0ba3991..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/WriteUpdate.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.usergrid.persistence.collection.guice;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface WriteUpdate {}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fa36f42..10d85f8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -32,8 +32,6 @@ import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
@@ -66,7 +64,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final WriteStart writeStart;
- private final WriteStart writeUpdate;
private final WriteUniqueVerify writeVerifyUnique;
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
@@ -91,7 +88,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
//create the target EM that will perform logic
final EntityCollectionManager target = new EntityCollectionManagerImpl(
- writeStart, writeUpdate, writeVerifyUnique,
+ writeStart, writeVerifyUnique,
writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
entitySerializationStrategy, uniqueValueSerializationStrategy,
mvccLogEntrySerializationStrategy, keyspace, serializationFig,entityVersionCleanupFactory,
@@ -107,8 +104,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
@Inject
- public EntityCollectionManagerFactoryImpl( @Write final WriteStart writeStart,
- @WriteUpdate final WriteStart writeUpdate,
+ public EntityCollectionManagerFactoryImpl( final WriteStart writeStart,
final WriteUniqueVerify writeVerifyUnique,
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback,
@@ -125,7 +121,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final SerializationFig serializationFig) {
this.writeStart = writeStart;
- this.writeUpdate = writeUpdate;
this.writeVerifyUnique = writeVerifyUnique;
this.writeOptimisticVerify = writeOptimisticVerify;
this.writeCommit = writeCommit;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index e483cc1..f496cac 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -28,12 +28,13 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
+import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.collection.guice.Write;
-import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccValidationUtils;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@ -44,11 +45,14 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -65,12 +69,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.StringSerializer;
-import org.apache.usergrid.persistence.collection.EntityDeletedFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCleanupFactory;
-import org.apache.usergrid.persistence.collection.EntityVersionCreatedFactory;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import rx.Observable;
import rx.Subscriber;
@@ -92,7 +90,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
//start stages
private final WriteStart writeStart;
- private final WriteStart writeUpdate;
private final WriteUniqueVerify writeVerifyUnique;
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
@@ -117,8 +114,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
@Inject
public EntityCollectionManagerImpl(
- @Write final WriteStart writeStart,
- @WriteUpdate final WriteStart writeUpdate,
+ final WriteStart writeStart,
final WriteUniqueVerify writeVerifyUnique,
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit,
@@ -142,7 +138,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
MvccValidationUtils.validateCollectionScope( collectionScope );
this.writeStart = writeStart;
- this.writeUpdate = writeUpdate;
this.writeVerifyUnique = writeVerifyUnique;
this.writeOptimisticVerify = writeOptimisticVerify;
this.writeCommit = writeCommit;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
index 8cd21e1..92dc69d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStart.java
@@ -38,7 +38,6 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
private final MvccLogEntrySerializationStrategy logStrategy;
- MvccEntity.Status status;
/**
@@ -46,9 +45,8 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
*/
@Inject
- public WriteStart ( final MvccLogEntrySerializationStrategy logStrategy, MvccEntity.Status status) {
+ public WriteStart ( final MvccLogEntrySerializationStrategy logStrategy) {
this.logStrategy = logStrategy;
- this.status = status;
}
@@ -69,7 +67,7 @@ public class WriteStart implements Func1<CollectionIoEvent<Entity>, CollectionIo
MutationBatch write = logStrategy.write( collectionScope, startEntry );
- final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, newVersion, status, entity );
+ final MvccEntityImpl nextStage = new MvccEntityImpl( entityId, newVersion, MvccEntity.Status.COMPLETE, entity );
if(ioEvent.getEvent().hasVersion()) {
try {
write.execute();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 108a4d8..7580a26 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -334,56 +334,6 @@ public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope
.toBlocking()
.last();
}
- } ).doOnNext(
- new Action1<List<GroupedObservable<Id, EntityToSaveMessage>>>() {
- @Override
- public void call(
- final List<GroupedObservable<Id, EntityToSaveMessage>> groupedObservables ) {
-
- for ( final GroupedObservable<Id, EntityToSaveMessage> group : groupedObservables ) {
-
- //get the highest
- // entity and run a
- // cleanup task on it
- final EntityToSaveMessage
- maxEntity =
- group.toBlocking()
- .last();
-
- final EntityVersionCleanupTask
- task =
- entityVersionCleanupFactory
- .getTask(
- maxEntity.scope,
- maxEntity.entity
- .getId(),
- maxEntity.entity
- .getVersion() );
-
- /**
- * just run the
- * call in this
- * process, we're
- * already
- * doing parallel
- * this forces a
- * repair of the
- * unique properties,
- and will bring us
- to a consistent
- state after the
- */
-
- try {
- task.call();
- }
- catch ( Exception e ) {
- throw new RuntimeException(
- "Unable to run cleanup task",
- e );
- }
- }
- }
} ).
reduce( 0l,
new Func2<Long, List<GroupedObservable<Id, EntityToSaveMessage>>, Long>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
index a683d23..c00c82a 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteStartTest.java
@@ -68,7 +68,7 @@ public class WriteStartTest extends AbstractEntityStageTest {
final Entity entity = TestEntityGenerator.generateEntity();
//run the stage
- WriteStart newStage = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
+ WriteStart newStage = new WriteStart( logStrategy);
//verify the observable is correct
@@ -113,7 +113,7 @@ public class WriteStartTest extends AbstractEntityStageTest {
//set up the mock to return the entity from the start phase
final Entity entity = TestEntityGenerator.generateEntity(new SimpleId(UUID.randomUUID(),"test"),null);
//run the stage
- WriteStart newStage = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
+ WriteStart newStage = new WriteStart( logStrategy );
//verify the observable is correct
CollectionIoEvent<MvccEntity> result = newStage.call( new CollectionIoEvent<Entity>( context, entity ) );
@@ -140,7 +140,7 @@ public class WriteStartTest extends AbstractEntityStageTest {
protected void validateStage( final CollectionIoEvent<Entity> event ) {
final MvccLogEntrySerializationStrategy logStrategy = mock( MvccLogEntrySerializationStrategy.class );
- new WriteStart( logStrategy, MvccEntity.Status.COMPLETE ).call( event );
+ new WriteStart( logStrategy ).call( event );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
new file mode 100644
index 0000000..dde67cb
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationV1ToV3ImplTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.UUID;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.CollectionDataVersions;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV1Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
+import org.apache.usergrid.persistence.core.guice.DataMigrationResetRule;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.TestProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import rx.Observable;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+
+@NotThreadSafe
+@RunWith( ITRunner.class )
+@UseModules( { TestCollectionModule.class } )
+public class MvccEntityDataMigrationV1ToV3ImplTest implements DataMigrationResetRule.DataMigrationManagerProvider {
+
+ @Inject
+ public DataMigrationManager dataMigrationManager;
+
+ @Inject
+ private MvccEntitySerializationStrategyV1Impl v1Impl;
+
+ @Inject
+ private MvccEntitySerializationStrategyV3Impl v3Impl;
+
+ @Inject
+ public MvccEntityDataMigrationImpl mvccEntityDataMigrationImpl;
+
+ @Inject
+ public VersionedMigrationSet<MvccEntitySerializationStrategy> versions;
+
+ /**
+ * Rule to do the resets we need
+ */
+ @Rule
+ public DataMigrationResetRule migrationTestRule =
+ new DataMigrationResetRule( this, CollectionMigrationPlugin.PLUGIN_NAME,
+ CollectionDataVersions.INITIAL.getVersion() );
+
+
+ @Test
+ public void testMigration() throws ConnectionException {
+
+ final Id applicationId = createId("application");
+ final String collectionName = "things";
+
+ CollectionScope scope = new CollectionScopeImpl(applicationId, applicationId, collectionName );
+
+ final MvccEntity entity1 = getEntity( "thing" );
+ final MvccEntity entity2 = getEntity( "thing" );
+
+ v1Impl.write( scope, entity1 ).execute();
+ v1Impl.write( scope, entity2 ).execute();
+
+
+ MvccEntity returned1 = v1Impl.load( scope, entity1.getId() ).get();
+ MvccEntity returned2 = v1Impl.load( scope, entity2.getId() ).get();
+
+ assertEquals("Same entity", entity1, returned1);
+ assertEquals("Same entity", entity2, returned2);
+
+ final Observable<EntityIdScope> entityIdScope = Observable.from( new EntityIdScope( scope, entity1.getId() ), new EntityIdScope( scope, entity2.getId() ) );
+
+
+ final MigrationDataProvider<EntityIdScope> migrationProvider = new MigrationDataProvider<EntityIdScope>() {
+ @Override
+ public Observable<EntityIdScope> getData() {
+ return entityIdScope;
+ }
+ };
+
+ final TestProgressObserver progressObserver = new TestProgressObserver();
+
+ //now migration
+ final int newVersion = mvccEntityDataMigrationImpl.migrate( CollectionDataVersions.INITIAL.getVersion(), migrationProvider, progressObserver );
+
+
+ assertEquals( "Correct version returned", newVersion, CollectionDataVersions.LOG_REMOVAL.getVersion() );
+ assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+ assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+ //now verify we can read the data correctly in the new version
+ returned1 = v3Impl.load( scope, entity1.getId() ).get();
+ returned2 = v3Impl.load( scope, entity2.getId() ).get();
+
+ assertEquals("Same entity", entity1, returned1);
+ assertEquals("Same entity", entity2, returned2);
+
+ //verify the tuple is correct
+
+ final MigrationRelationship<MvccEntitySerializationStrategy>
+ tuple = versions.getMigrationRelationship( newVersion );
+
+
+ assertSame("Same instance for from", v1Impl, tuple.from);
+ assertSame("Same instance for to", v3Impl, tuple.to);
+
+
+
+ }
+
+
+ private MvccEntity getEntity(final String type){
+
+ final SimpleId entityId = new SimpleId( type );
+ final UUID version = UUIDGenerator.newTimeUUID();
+ final Entity entity = new Entity( entityId );
+
+ MvccEntityImpl logEntry = new MvccEntityImpl( entityId, version, MvccEntity.Status.COMPLETE, entity );
+
+
+ return logEntry;
+
+
+ }
+
+
+
+
+
+ @Override
+ public DataMigrationManager getDataMigrationManager() {
+ return dataMigrationManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java
new file mode 100644
index 0000000..0167ff6
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/DataMigrationResetRule.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.guice;
+
+
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * A test rule that will set up a specific version for the plugin before test invocation
+ * then set it back afterwards
+ */
+public class DataMigrationResetRule extends ExternalResource {
+ private static final Logger logger = LoggerFactory.getLogger( DataMigrationResetRule.class );
+
+
+ private DataMigrationManagerProvider dataMigrationManagerProvider;
+
+ private final String pluginName;
+
+ private final int versionToSet;
+
+ private int existingVersion = -1;
+
+
+ public DataMigrationResetRule( final DataMigrationManagerProvider dataMigrationManagerProvider, final String pluginName, final int versionToSet ) {
+ this.dataMigrationManagerProvider = dataMigrationManagerProvider;
+ this.pluginName = pluginName;
+ this.versionToSet = versionToSet;
+ }
+
+
+
+ @Override
+ protected void before() throws MigrationException {
+
+ existingVersion = dataMigrationManagerProvider.getDataMigrationManager().getCurrentVersion( pluginName );
+
+ dataMigrationManagerProvider.getDataMigrationManager().resetToVersion( pluginName, versionToSet );
+
+ logger.info( "Migration complete" );
+ }
+
+
+ @Override
+ protected void after() {
+ dataMigrationManagerProvider.getDataMigrationManager().resetToVersion( pluginName, existingVersion );
+ }
+
+
+ /**
+ * Interface for getting a data migration manager during testing. Ugly, but required because we
+ * can't inject into this member
+ */
+ public static interface DataMigrationManagerProvider{
+
+ /**
+ * Get the data migration manager
+ * @return
+ */
+ public DataMigrationManager getDataMigrationManager();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java
new file mode 100644
index 0000000..63fad42
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.migration.data;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+
+
+public class TestProgressObserver implements ProgressObserver {
+
+ private boolean failed = false;
+
+
+ private List<String> updates = new ArrayList<>( 100 );
+
+
+ @Override
+ public void failed( final int migrationVersion, final String reason ) {
+ failed = true;
+ }
+
+
+ @Override
+ public void failed( final int migrationVersion, final String reason, final Throwable throwable ) {
+ failed = true;
+ }
+
+
+ @Override
+ public void update( final int migrationVersion, final String message ) {
+ updates.add( message );
+ }
+
+
+ /**
+ * Get if we failed
+ * @return
+ */
+ public boolean getFailed() {
+ return failed;
+ }
+
+
+ /**
+ * Get update messages
+ * @return
+ */
+ public List<String> getUpdates() {
+ return updates;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java
new file mode 100644
index 0000000..1b117a9
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/util/IdGenerator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.util;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+
+/**
+ * Test helper for generating ids
+ */
+public class IdGenerator {
+ /**
+ * Create the id
+ */
+ public static Id createId( String type ) {
+ return createId( UUIDGenerator.newTimeUUID(), type );
+ }
+
+
+ /**
+ * Generate an ID with the type and id
+ *
+ * @param id The uuid in the id
+ * @param type The type of id
+ */
+ public static Id createId( UUID id, String type ) {
+ return new SimpleId( id, type );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 67459b2..d504e47 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -47,7 +47,7 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
-import org.apache.usergrid.persistence.graph.serialization.impl.EdgeDataMigrationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.EdgeDataMigrationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV1Impl;
import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
@@ -57,6 +57,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.GraphManagerFact
import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.TargetIdObservableImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -130,8 +131,8 @@ public abstract class GraphModule extends AbstractModule {
//wire up the edg migration
- Multibinder<DataMigration2<ApplicationScope>> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<ApplicationScope>>() {} );
+ Multibinder<DataMigration2<GraphNode>> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<GraphNode>>() {} );
dataMigrationMultibinder.addBinding().to( EdgeDataMigrationImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
deleted file mode 100644
index 49a954d..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.graph.serialization.impl;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
-import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.functions.Func2;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Encapsulates the migration of edge meta data
- */
-public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
-
- private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
-
- private final Keyspace keyspace;
- private final GraphManagerFactory graphManagerFactory;
- private final EdgesObservable edgesFromSourceObservable;
- private final VersionedMigrationSet<EdgeMetadataSerialization> allVersions;
- private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
-
- @Inject
- public EdgeDataMigrationImpl( final Keyspace keyspace, final GraphManagerFactory graphManagerFactory,
- final EdgesObservable edgesFromSourceObservable,
-
- final VersionedMigrationSet<EdgeMetadataSerialization> allVersions,
- final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
-
- this.keyspace = keyspace;
- this.graphManagerFactory = graphManagerFactory;
- this.edgesFromSourceObservable = edgesFromSourceObservable;
- this.allVersions = allVersions;
- this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
- }
-
-
-
-
- @Override
- public int migrate( final int currentVersion, final MigrationDataProvider<ApplicationScope> migrationDataProvider,
- final ProgressObserver observer ) {
-
- final AtomicLong counter = new AtomicLong();
-
- final MigrationRelationship<EdgeMetadataSerialization>
- migration = allVersions.getMigrationRelationship( currentVersion );
-
- final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap(new Func1<ApplicationScope, Observable<List<Edge>>>() {
- @Override
- public Observable<List<Edge>> call(final ApplicationScope applicationScope) {
- final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- final Observable<Edge> edgesFromSource =
- edgesFromSourceObservable.edgesFromSource( gm, applicationScope.getApplication() );
- logger.info( "Migrating edges scope {}", applicationScope );
-
- //get each edge from this node as a source
- return edgesFromSource
-
- //for each edge, re-index it in v2 every 1000 edges or less
- .buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() {
- @Override
- public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) {
- return listObservable.doOnNext( new Action1<List<Edge>>() {
- @Override
- public void call( List<Edge> edges ) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( Edge edge : edges ) {
- logger.info( "Migrating meta for edge {}", edge );
- final MutationBatch edgeBatch =
- migration.to.writeEdge( applicationScope, edge );
- batch.mergeShallow( edgeBatch );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to perform migration", e );
- }
-
- //update the observer so the admin can see it
- final long newCount = counter.addAndGet( edges.size() );
-
- observer.update( migration.to.getImplementationVersion(),
- String.format( "Currently running. Rewritten %d edge types",
- newCount ) );
- }
- } );
- }
- } );
- }});
-
- observable.longCount().toBlocking().last();
-
- return migration.to.getImplementationVersion();
-
- }
-
-
-
-
- @Override
- public boolean supports( final int currentVersion ) {
- return currentVersion < edgeMetadataSerializationV2.getImplementationVersion();
- }
-
-
- @Override
- public int getMaxVersion() {
- //we only support up to v2 ATM
- return edgeMetadataSerializationV2.getImplementationVersion();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
new file mode 100644
index 0000000..af157a6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.migration;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationRelationship;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedMigrationSet;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Encapsulates the migration of edge meta data.
+ *
+ * The migration data provider assumes that is will visit every node in the graph
+ * all edges from these source node will then be re-indexed.
+ */
+public class EdgeDataMigrationImpl implements DataMigration2<GraphNode> {
+
+ private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
+
+ private final Keyspace keyspace;
+ private final GraphManagerFactory graphManagerFactory;
+ private final EdgesObservable edgesFromSourceObservable;
+ private final VersionedMigrationSet<EdgeMetadataSerialization> allVersions;
+ private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
+
+ @Inject
+ public EdgeDataMigrationImpl( final Keyspace keyspace,
+ final GraphManagerFactory graphManagerFactory,
+ final EdgesObservable edgesFromSourceObservable,
+ final VersionedMigrationSet<EdgeMetadataSerialization> allVersions,
+ final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
+
+ this.keyspace = keyspace;
+ this.graphManagerFactory = graphManagerFactory;
+ this.edgesFromSourceObservable = edgesFromSourceObservable;
+ this.allVersions = allVersions;
+ this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
+ }
+
+
+
+
+ @Override
+ public int migrate( final int currentVersion, final MigrationDataProvider<GraphNode> migrationDataProvider,
+ final ProgressObserver observer ) {
+
+ final AtomicLong counter = new AtomicLong();
+
+ final MigrationRelationship<EdgeMetadataSerialization>
+ migration = allVersions.getMigrationRelationship( currentVersion );
+
+ final Observable<List<Edge>> observable = migrationDataProvider.getData().flatMap( new Func1<GraphNode,
+ Observable<List<Edge>>>() {
+ @Override
+ public Observable<List<Edge>> call( final GraphNode graphNode ) {
+ final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
+
+ //get edges from the source
+ return edgesFromSourceObservable.edgesFromSource( gm, graphNode.entryNode ).buffer( 1000 ).parallel( new Func1<Observable<List<Edge>>, Observable<List<Edge>>>() {
+ @Override
+ public Observable<List<Edge>> call( final Observable<List<Edge>> listObservable ) {
+ return listObservable.doOnNext( new Action1<List<Edge>>() {
+ @Override
+ public void call( List<Edge> edges ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( Edge edge : edges ) {
+ logger.info( "Migrating meta for edge {}", edge );
+ final MutationBatch edgeBatch =
+ migration.to.writeEdge( graphNode.applicationScope, edge );
+ batch.mergeShallow( edgeBatch );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to perform migration", e );
+ }
+
+ //update the observer so the admin can see it
+ final long newCount = counter.addAndGet( edges.size() );
+
+ observer.update( migration.to.getImplementationVersion(),
+ String.format( "Currently running. Rewritten %d edge types",
+ newCount ) );
+ }
+ } );
+ } } );
+ }} );
+
+ observable.longCount().toBlocking().last();
+
+ return migration.to.getImplementationVersion();
+
+ }
+
+
+
+
+ @Override
+ public boolean supports( final int currentVersion ) {
+ return currentVersion < edgeMetadataSerializationV2.getImplementationVersion();
+ }
+
+
+ @Override
+ public int getMaxVersion() {
+ //we only support up to v2 ATM
+ return edgeMetadataSerializationV2.getImplementationVersion();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
index 0d3405b..c989822 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphMigrationPlugin.java
@@ -30,7 +30,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializ
import org.apache.usergrid.persistence.core.migration.data.newimpls.AbstractMigrationPlugin;
import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -40,15 +39,15 @@ import com.google.inject.Singleton;
* Migration plugin for the collection module
*/
@Singleton
-public class GraphMigrationPlugin extends AbstractMigrationPlugin<ApplicationScope> {
+public class GraphMigrationPlugin extends AbstractMigrationPlugin<GraphNode> {
public static final String PLUGIN_NAME = "graph-data";
@Inject
- public GraphMigrationPlugin( final Set<DataMigration2<ApplicationScope>> entityDataMigrations,
- final MigrationDataProvider<ApplicationScope> entityIdScopeDataMigrationProvider,
+ public GraphMigrationPlugin( final Set<DataMigration2<GraphNode>> entityDataMigrations,
+ final MigrationDataProvider<GraphNode> entityIdScopeDataMigrationProvider,
final MigrationInfoSerialization migrationInfoSerialization ) {
super( entityDataMigrations, entityIdScopeDataMigrationProvider, migrationInfoSerialization );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java
new file mode 100644
index 0000000..00aa617
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/GraphNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.migration;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Traverses a graph give the scope and then will traverse from the root node to the last scope.
+ */
+public class GraphNode {
+ public final ApplicationScope applicationScope;
+ public final Id entryNode;
+
+
+ public GraphNode( final ApplicationScope applicationScope, final Id entryNode ) {
+ this.applicationScope = applicationScope;
+ this.entryNode = entryNode;
+ }
+}