You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/19 23:19:58 UTC
[08/50] [abbrv] incubator-usergrid git commit: WIP,
still needs refactored and cleaned up.
WIP, still needs refactored and cleaned up.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d3f8ee61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d3f8ee61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d3f8ee61
Branch: refs/heads/USERGRID-493
Commit: d3f8ee6184e2a85dc669e836201496748bf35001
Parents: 6652fe5
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Feb 26 18:57:03 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Feb 26 18:57:03 2015 -0700
----------------------------------------------------------------------
.../mvcc/MvccEntityMigrationStrategy.java | 30 --
.../MvccEntitySerializationStrategy.java | 3 +-
.../impl/MvccEntityDataMigrationImpl.java | 179 -----------
...vccEntitySerializationStrategyProxyImpl.java | 18 +-
...cEntitySerializationStrategyProxyV1Impl.java | 62 ----
...cEntitySerializationStrategyProxyV2Impl.java | 43 ---
.../serialization/impl/SerializationModule.java | 22 +-
.../migration/CollectionMigrationPlugin.java | 68 +++++
.../impl/migration/EntityIdScope.java | 49 +++
.../migration/MvccEntityDataMigrationImpl.java | 179 +++++++++++
.../persistence/core/guice/CommonModule.java | 8 +-
.../data/ApplicationDataMigration.java | 2 +-
.../migration/data/CollectionDataMigration.java | 2 +-
.../core/migration/data/DataMigration.java | 94 ------
.../migration/data/DataMigrationManager.java | 15 +-
.../data/DataMigrationManagerImpl.java | 304 ++++++-------------
.../data/MigrationInfoSerialization.java | 30 +-
.../data/MigrationInfoSerializationImpl.java | 84 ++---
.../migration/data/newimpls/DataMigration2.java | 26 +-
.../data/newimpls/MigrationPlugin.java | 12 +-
.../data/newimpls/MigrationRelationship.java | 41 +++
.../data/newimpls/ProgressObserver.java | 50 +++
.../migration/data/newimpls/VersionedData.java | 38 +++
.../migration/data/newimpls/VersionedSet.java | 113 +++++++
.../migration/schema/MigrationStrategy.java | 60 ----
.../core/rx/AllEntitiesInSystemObservable.java | 2 +-
.../core/scope/ApplicationEntityGroup.java | 39 ---
.../persistence/core/scope/EntityIdScope.java | 44 ---
.../core/guice/MaxMigrationVersion.java | 1 -
.../persistence/core/guice/TestModule.java | 3 +-
.../data/DataMigrationManagerImplTest.java | 2 -
.../impl/EdgeDataMigrationImpl.java | 111 +++----
32 files changed, 801 insertions(+), 933 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
deleted file mode 100644
index 6fed25d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.collection.mvcc;
-
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
-
-/**
- * Encapsulates the migration path between versions
- */
-public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
index b0b7233..de3cab3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccEntitySerializationStrategy.java
@@ -26,6 +26,7 @@ import java.util.UUID;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.VersionedData;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -36,7 +37,7 @@ import com.netflix.astyanax.MutationBatch;
/**
* The interface that allows us to serialize an entity to disk
*/
-public interface MvccEntitySerializationStrategy extends Migration {
+public interface MvccEntitySerializationStrategy extends Migration, VersionedData {
/**
* Serialize the entity to the data store with the given collection context
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
deleted file mode 100644
index 4dfc64b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntityDataMigrationImpl.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.functions.Func2;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Data migration strategy for entities
- */
-public class MvccEntityDataMigrationImpl implements CollectionDataMigration {
-
- private final Keyspace keyspace;
- private final MvccEntityMigrationStrategy entityMigrationStrategy;
-
-
- @Inject
- public MvccEntityDataMigrationImpl( Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy ) {
-
- this.keyspace = keyspace;
- this.entityMigrationStrategy = serializationStrategy;
- }
-
-
- @Override
- public Observable migrate( final Observable<ApplicationEntityGroup> applicationEntityGroupObservable,
- final DataMigration.ProgressObserver observer ) {
- final AtomicLong atomicLong = new AtomicLong();
-
-
- //capture the time the test starts
- final UUID now = UUIDGenerator.newTimeUUID();
-
- return applicationEntityGroupObservable.flatMap( new Func1<ApplicationEntityGroup, Observable<Long>>() {
- @Override
- public Observable<Long> call( final ApplicationEntityGroup applicationEntityGroup ) {
- final List<EntityIdScope<CollectionScope>> entityIds = applicationEntityGroup.entityIds;
-
- //go through each entity in the system, and load it's entire
- // history
- return Observable.from( entityIds ).subscribeOn( Schedulers.io() )
- .parallel( new Func1<Observable<EntityIdScope<CollectionScope>>, Observable<Long>>() {
-
-
- //process the ids in parallel
- @Override
- public Observable<Long> call(
- final Observable<EntityIdScope<CollectionScope>> entityIdScopeObservable
- ) {
-
- final MutationBatch totalBatch = keyspace.prepareMutationBatch();
-
- return entityIdScopeObservable.doOnNext(
- new Action1<EntityIdScope<CollectionScope>>() {
-
- //load the entity and add it to the toal mutation
- @Override
- public void call( final EntityIdScope<CollectionScope> idScope ) {
-
- //load the entity
- MigrationStrategy
- .MigrationRelationship<MvccEntitySerializationStrategy>
- migration = entityMigrationStrategy.getMigration();
-
-
- CollectionScope currentScope = idScope.getCollectionScope();
- //for each element in the history in the previous
- // version,
- // copy it to the CF in v2
- EntitySet allVersions = migration.from().load( currentScope,
- Collections.singleton( idScope.getId() ), now );
-
- final MvccEntity version =
- allVersions.getEntity( idScope.getId() );
-
- final MutationBatch versionBatch =
- migration.to().write( currentScope, version );
-
- totalBatch.mergeShallow( versionBatch );
- }
- } )
- //every 100 flush the mutation
- .buffer( 100 ).doOnNext(
- new Action1<List<EntityIdScope<CollectionScope>>>() {
- @Override
- public void call(
- final List<EntityIdScope<CollectionScope>> ids ) {
- atomicLong.addAndGet( 100 );
- executeBatch( totalBatch, observer, atomicLong );
- }
- } )
- //count the results
- .reduce( 0l,
- new Func2<Long, List<EntityIdScope<CollectionScope>>, Long>() {
- @Override
- public Long call( final Long aLong,
- final
- List<EntityIdScope<CollectionScope>>
- ids ) {
- return aLong + ids.size();
- }
- } );
- }
- } );
- }
- } );
- }
-
-
- protected void executeBatch( final MutationBatch batch, final DataMigration.ProgressObserver po,
- final AtomicLong count ) {
- try {
- batch.execute();
-
- po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
- }
- catch ( ConnectionException e ) {
- po.failed( getVersion(), "Failed to execute mutation in cassandra" );
- throw new DataMigrationException( "Unable to migrate batches ", e );
- }
- }
-
-
- @Override
- public int getVersion() {
- return entityMigrationStrategy.getVersion();
- }
-
-
- @Override
- public MigrationType getType() {
- return MigrationType.Entities;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
index c18df29..03271a2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyImpl.java
@@ -35,27 +35,24 @@ import java.util.*;
/**
* Version 4 implementation of entity serialization. This will proxy writes and reads so that during
- * migration data goes to both sources and is read from the old source. After the ugprade completes,
+ * migration data goes to both sources and is read from the old source. After the upgrade completes,
* it will be available from the new source
*/
-public abstract class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
+public class MvccEntitySerializationStrategyProxyImpl implements MvccEntitySerializationStrategy {
protected final Keyspace keyspace;
- protected final MvccEntitySerializationStrategy previous;
- protected final MvccEntitySerializationStrategy current;
private final MigrationInfoSerialization migrationInfoSerialization;
+ private final
@Inject
public MvccEntitySerializationStrategyProxyImpl(final Keyspace keyspace,
- final MvccEntitySerializationStrategy previous,
- final MvccEntitySerializationStrategy current,
+ final Set<MvccEntitySerializationStrategy> allVersions,
final MigrationInfoSerialization migrationInfoSerialization) {
this.keyspace = keyspace;
- this.previous = previous;
- this.current = current;
+
this.migrationInfoSerialization = migrationInfoSerialization;
}
@@ -159,5 +156,10 @@ public abstract class MvccEntitySerializationStrategyProxyImpl implements MvccEn
return Collections.emptyList();
}
+
+ @Override
+ public int getImplementationVersion() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
deleted file mode 100644
index f2c8d33..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.V1Impl;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-
-
-/**
- * Version 3 implementation of entity serialization. This will proxy writes and reads so that during
- * migration data goes to both sources and is read from the old source. After the ugprade completes,
- * it will be available from the new source
- */
-@Singleton
-public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerializationStrategyProxyImpl implements MvccEntityMigrationStrategy {
-
-
- @Inject
- public MvccEntitySerializationStrategyProxyV1Impl(final Keyspace keyspace,
- @V1Impl final MvccEntitySerializationStrategy previous,
- @V2Impl final MvccEntitySerializationStrategy current,
- final MigrationInfoSerialization migrationInfoSerialization) {
- super( keyspace, previous, current,migrationInfoSerialization);
- }
-
- @Override
- public MigrationRelationship<MvccEntitySerializationStrategy> getMigration() {
- return new MigrationRelationship<>(previous,current);
- }
-
- @Override
- public int getVersion() {
- return V2Impl.MIGRATION_VERSION;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
deleted file mode 100644
index d897ee4..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.usergrid.persistence.collection.serialization.impl;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.*;
-import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-
-
-/**
- * Version 4 implementation of entity serialization. This will proxy writes and reads so that during
- * migration data goes to both sources and is read from the old source. After the ugprade completes,
- * it will be available from the new source
- */
-@Singleton
-public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerializationStrategyProxyImpl implements MvccEntityMigrationStrategy {
-
-
- @Inject
- public MvccEntitySerializationStrategyProxyV2Impl( final MigrationInfoSerialization migrationInfoSerialization,
- final Keyspace keyspace,
- @V1ProxyImpl final MvccEntitySerializationStrategy previous,
- @V3Impl final MvccEntitySerializationStrategy current) {
- super(keyspace,previous,current,migrationInfoSerialization);
- }
-
- @Override
- public int getVersion() {
- return V3Impl.MIGRATION_VERSION;
- }
-
-
- @Override
- public MigrationRelationship<MvccEntitySerializationStrategy> getMigration() {
- return new MigrationRelationship<>(this.previous,this.current);
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index a86d7ec..a9c493f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -22,9 +22,15 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrate
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.core.guice.*;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.MvccEntityDataMigrationImpl;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V1Impl;
+import org.apache.usergrid.persistence.core.guice.V1ProxyImpl;
+import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.guice.V3Impl;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import com.google.inject.AbstractModule;
@@ -56,10 +62,16 @@ public class SerializationModule extends AbstractModule {
bind(MvccEntitySerializationStrategy.class ).annotatedWith(ProxyImpl.class)
.to(MvccEntitySerializationStrategyProxyV2Impl.class);
- Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+
+
+
+ Multibinder<DataMigration2> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration2.class );
dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
- bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);
+
+ Multibinder.newSetBinder( binder(), MigrationPlugin.class).addBinding().to( CollectionMigrationPlugin.class );
+
+
bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
new file mode 100644
index 0000000..5713af7
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/CollectionMigrationPlugin.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * *
+ *
+ */
+
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.Set;
+
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+
+
+/**
+ * Migration plugin for the collection module
+ */
+public class CollectionMigrationPlugin implements MigrationPlugin {
+
+
+ private final DataMigration2<EntityIdScope> entityDataMigration;
+ private final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider;
+
+
+ public CollectionMigrationPlugin( final DataMigration2<EntityIdScope> entityDataMigration,
+ final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider ) {
+ this.entityDataMigration = entityDataMigration;
+ this.entityIdScopeDataMigrationProvider = entityIdScopeDataMigrationProvider;
+ }
+
+
+ @Override
+ public String getName() {
+ return "collections-entity-data";
+ }
+
+
+ @Override
+ public void run( final ProgressObserver observer ) {
+ entityDataMigration.migrate( entityIdScopeDataMigrationProvider, observer );
+ }
+
+
+ @Override
+ public int getMaxVersion() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
new file mode 100644
index 0000000..0e70e75
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/EntityIdScope.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+import java.util.Collection;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+/**
+ * Tuple containing collectionscope and entityid
+ */
+public class EntityIdScope{
+ private final Id id;
+ private final CollectionScope collectionScope;
+
+ public EntityIdScope(CollectionScope collectionScope, Id id){
+ this.id = id;
+ this.collectionScope = collectionScope;
+ }
+
+
+ public Id getId() {
+ return id;
+ }
+
+
+ public CollectionScope getCollectionScope() {
+ return collectionScope;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
new file mode 100644
index 0000000..73efcba
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.serialization.impl.migration;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntitySet;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.functions.Func2;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * Data migration strategy for entities
+ */
+public class MvccEntityDataMigrationImpl implements DataMigration2<EntityIdScope> {
+
+ private final Keyspace keyspace;
+ private final MvccEntityMigrationStrategy entityMigrationStrategy;
+
+
+ @Inject
+ public MvccEntityDataMigrationImpl( Keyspace keyspace, MvccEntityMigrationStrategy serializationStrategy ) {
+
+ this.keyspace = keyspace;
+ this.entityMigrationStrategy = serializationStrategy;
+ }
+
+
+ @Override
+ public int getVersion() {
+ return entityMigrationStrategy.getVersion();
+ }
+
+
+ @Override
+ public void migrate( final MigrationDataProvider<EntityIdScope> migrationDataProvider, final ProgressObserver observer ) {
+
+ final AtomicLong atomicLong = new AtomicLong();
+
+
+ //capture the time the test starts
+
+ final UUID startTime = UUIDGenerator.newTimeUUID();
+
+ final long migrated = migrationDataProvider.getData().subscribeOn( Schedulers.io() )
+ .parallel( new Func1<Observable<EntityIdScope>, Observable<Long>>() {
+
+
+ //process the ids in parallel
+ @Override
+ public Observable<Long> call(
+ final Observable<EntityIdScope>
+ entityIdScopeObservable ) {
+
+ final MutationBatch totalBatch =
+ keyspace.prepareMutationBatch();
+
+ return entityIdScopeObservable.doOnNext(
+ new Action1<EntityIdScope>() {
+
+ //load the entity and add it to the toal mutation
+ @Override
+ public void call( final EntityIdScope idScope ) {
+
+ //load the entity
+ MigrationStrategy
+ .MigrationRelationship<MvccEntitySerializationStrategy>
+ migration = entityMigrationStrategy
+ .getMigration();
+
+
+ CollectionScope currentScope =
+ idScope.getCollectionScope();
+
+ //for each element in the history in the
+ // previous
+ // version,
+ // copy it to the CF in v2
+
+
+ EntitySet allVersions = migration.from()
+ .load( currentScope,
+ Collections
+ .singleton(
+ idScope.getId() ),
+ startTime );
+
+ final MvccEntity version = allVersions
+ .getEntity( idScope.getId() );
+
+ final MutationBatch versionBatch =
+ migration.to().write( currentScope,
+ version );
+
+ totalBatch.mergeShallow( versionBatch );
+ }
+ } )
+ //every 100 flush the mutation
+ .buffer( 100 ).doOnNext(
+ new Action1<List<EntityIdScope>>() {
+ @Override
+ public void call(
+ final List<EntityIdScope> ids ) {
+ atomicLong.addAndGet( 100 );
+ executeBatch( totalBatch, observer,
+ atomicLong );
+ }
+ } )
+ //count the results
+ .reduce( 0l,
+ new Func2<Long, List<EntityIdScope>, Long>
+ () {
+ @Override
+ public Long call( final Long aLong,
+ final List<EntityIdScope> ids ) {
+ return aLong + ids.size();
+ }
+ } );
+ }
+ } ).toBlocking().last();
+
+ //now we update the progress observer
+
+ observer.update( getVersion(), "Finished for this step. Migrated " + migrated + "entities total. ");
+ }
+
+
+ protected void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+ try {
+ batch.execute();
+
+ po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+ }
+ catch ( ConnectionException e ) {
+ po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+ throw new DataMigrationException( "Unable to migrate batches ", e );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index 657d965..61b6b04 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfigImpl;
import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.core.migration.schema.MigrationManagerFig;
@@ -77,11 +78,8 @@ public class CommonModule extends AbstractModule {
//do multibindings for migrations
- Multibinder<DataMigration> applicationDataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
-// dataMigrationMultibinder.addBinding();
-// dataMigrationManagerMultibinder.addBinding().to( DataMigrationManagerImpl.class );
-// migrationBinding.addBinding().to( Key.get( MigrationInfoSerialization.class ) );
-
+ //create the empty multibinder so other plugins can use it
+ Multibinder.newSetBinder( binder(), MigrationPlugin.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
index 6dca092..bbff0b9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/ApplicationDataMigration.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.core.migration.data;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import rx.Observable;
+
/**
* Migrate applications
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
index e29571c..df1af72 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/CollectionDataMigration.java
@@ -20,7 +20,7 @@
package org.apache.usergrid.persistence.core.migration.data;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import rx.Observable;
+
/**
* Migrate Collections
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
deleted file mode 100644
index e1e3092..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.migration.data;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import rx.Observable;
-
-/**
- * An interface for updating data. Has 2 basic functions. First it will perform the migration and update the status
- * object.
- *
- * Second it will only migrate a single version. For instance, it will migrate from 0->1, or from 1->2. All migrations
- * must follow the following basic rules.
- *
- * <ol>
- * <li>They must not modify the structure of an existing column family. If the data format changes, a new
- * implementation and column family must be created. A proxy must be defined to do dual writes/single reads. </li>
- * <li>The migration must update the progress observer. This information should be made available cluster wide.</li>
- * <li>In the event a migration fails with an error, we should be able to roll back and remove new column families. We
- * can then fix the bug, and deploy again. Hence the need for the proxy, dual writes, and an immutable CF
- * format</li>
- * </ol>
- */
-
-
-public interface DataMigration <T> {
-
-
- /**
- * Migrate the data to the specified version
- * @param observer
- * @Return an observable containing the count of the number of elements migrated
- * @throws Throwable
- */
- public Observable<Long> migrate(final Observable<T> applicationEntityGroup,final ProgressObserver observer) throws Throwable;
-
- /**
- * Get the version of this migration. It must be unique.
- * @return
- */
- public int getVersion();
-
- public MigrationType getType();
-
- public interface ProgressObserver{
- /**
- * Mark the migration as failed
- * @param migrationVersion The migration version running during the failure
- * @param reason The reason to save
- */
- public void failed(final int migrationVersion, final String reason);
-
- /**
- * Mark the migration as failed with a stack trace
- * @param migrationVersion The migration version running during the failure
- * @param reason The error description to save
- * @param throwable The error that happened
- */
- public void failed(final int migrationVersion, final String reason, final Throwable throwable);
-
-
- /**
- * Update the status of the migration with the message
- *
- * @param message The message to save for the status
- */
- public void update(final int migrationVersion, final String message);
- }
-
- public enum MigrationType{
- Entities,
- Applications,
- System,
- Collections
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
index e47e264..1ca8d90 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
@@ -19,6 +19,9 @@
package org.apache.usergrid.persistence.core.migration.data;
+import java.util.List;
+import java.util.Set;
+
import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
@@ -43,13 +46,13 @@ public interface DataMigrationManager {
* Get the current version of the schema
* @return
*/
- public int getCurrentVersion();
+ public int getCurrentVersion(final String pluginName);
/**
* Reset the system version to the version specified
* @param version
*/
- public void resetToVersion(final int version);
+ public void resetToVersion(final String pluginName, final int version);
/**
* Invalidate the cache for versions
@@ -60,5 +63,11 @@ public interface DataMigrationManager {
/**
* Return that last status of the migration
*/
- public String getLastStatus();
+ public String getLastStatus(final String pluginName);
+
+ /**
+ * Return a list of all plugin names
+ * @return
+ */
+ public Set<String> getPluginNames();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index 380bedc..ddb0a61 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -21,24 +21,24 @@ package org.apache.usergrid.persistence.core.migration.data;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.Singleton;
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
@Singleton
@@ -46,256 +46,122 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
private static final Logger LOG = LoggerFactory.getLogger( DataMigrationManagerImpl.class );
- private final TreeMap<Integer, DataMigration> migrationTreeMap = new TreeMap<>();
+ private final Map<String, MigrationPlugin> migrationPlugins = new HashMap<>();
private final MigrationInfoSerialization migrationInfoSerialization;
- private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
- private final ApplicationObservable applicationObservable;
- private final Set<DataMigration> dataMigrations;
-
-
- @Inject
- public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
- final Set<DataMigration> dataMigrations,
- final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
- final ApplicationObservable applicationObservable
- ) {
-
- Preconditions.checkNotNull( migrationInfoSerialization,
- "migrationInfoSerialization must not be null" );
- Preconditions.checkNotNull( dataMigrations, "migrations must not be null" );
- Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
- Preconditions.checkNotNull( applicationObservable, "applicationObservable must not be null" );
- this.dataMigrations = dataMigrations;
- this.migrationInfoSerialization = migrationInfoSerialization;
- this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
- this.applicationObservable = applicationObservable;
-
- }
-
-
- @Override
- public void migrate() throws MigrationException {
-
- if (!populateTreeMap())
- return;
-
- int currentVersion = migrationInfoSerialization.getCurrentVersion();
-
- if(currentVersion <= 0){
- resetToHighestVersion();
- //you have no work to do, just use the latest
- return;
- }
-
- LOG.info( "Saved schema version is {}, max migration version is {}", currentVersion,
- migrationTreeMap.lastKey() );
-
- //we have our migrations to run, execute them
- final Collection< DataMigration> migrationsToRun = migrationTreeMap.tailMap( currentVersion, false ).values();
-
-
-
- final CassandraProgressObserver observer = new CassandraProgressObserver();
-
- final Observable<ApplicationScope> appScopeObservable = applicationObservable.getAllApplicationScopes();
-
- final Observable<ApplicationEntityGroup> entitiesObservable = appScopeObservable.flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup>>() {
- @Override
- public Observable<ApplicationEntityGroup> call(ApplicationScope applicationScope) {
- return allEntitiesInSystemObservable.getAllEntitiesInSystem(appScopeObservable, 1000);
- }
- });
-
- final Observable<ApplicationDataMigration> applicationMigrationsRx = Observable.from(migrationsToRun)
- .filter(new Func1<DataMigration, Boolean>() {
- @Override
- public Boolean call(DataMigration dataMigration) {
- return dataMigration instanceof ApplicationDataMigration;
- }
- }).map(new Func1<DataMigration, ApplicationDataMigration>() {
- @Override
- public ApplicationDataMigration call(DataMigration dataMigration) {
- return (ApplicationDataMigration)dataMigration;
- }
- });
-
- final Observable<CollectionDataMigration> collectionMigrationsRx = Observable.from(migrationsToRun)
- .filter(new Func1<DataMigration, Boolean>() {
- @Override
- public Boolean call(DataMigration dataMigration) {
- return dataMigration instanceof CollectionDataMigration;
- }
- }).map(new Func1<DataMigration, CollectionDataMigration>() {
+ /**
+ * Cache to cache versions temporarily
+ */
+ private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
+ //cache the local value for 1 minute
+ .expireAfterWrite( 1, TimeUnit.MINUTES ).build( new CacheLoader<String, Integer>() {
@Override
- public CollectionDataMigration call(DataMigration dataMigration) {
- return (CollectionDataMigration) dataMigration;
+ public Integer load( final String key ) throws Exception {
+ return migrationInfoSerialization.getVersion( key );
}
- });
-
- Observable applications = applicationMigrationsRx
- .doOnNext(new Action1<ApplicationDataMigration>() {
- @Override
- public void call(ApplicationDataMigration dataMigration) {
-
- migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
-
- final int migrationVersion = dataMigration.getVersion();
-
- LOG.info("Running migration version {}", migrationVersion);
-
- observer.update(migrationVersion, "Starting migration");
+ } );
- //perform this migration, if it fails, short circuit
- try {
- dataMigration.migrate(appScopeObservable, observer).toBlocking().lastOrDefault(null);
- } catch (Throwable throwable) {
- observer.failed(migrationVersion, "Exception thrown during migration", throwable);
- LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
- throw new RuntimeException(throwable);
- }
- //we had an unhandled exception or the migration failed, short circuit
- if (observer.failed) {
- return;
- }
-
- //set the version
- migrationInfoSerialization.setVersion(migrationVersion);
-
- //update the observer for progress so other nodes can see it
- observer.update(migrationVersion, "Completed successfully");
-
- }
- });
-
-
- Observable entities =collectionMigrationsRx
- .doOnNext(new Action1<CollectionDataMigration>() {
- @Override
- public void call(CollectionDataMigration dataMigration) {
- migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
-
- final int migrationVersion = dataMigration.getVersion();
+ @Inject
+ public DataMigrationManagerImpl( final Set<MigrationPlugin> plugins,
+ final MigrationInfoSerialization migrationInfoSerialization ) {
- LOG.info("Running migration version {}", migrationVersion);
+ Preconditions.checkNotNull( plugins, "plugins must not be null" );
+ Preconditions.checkNotNull( migrationInfoSerialization, "migrationInfoSerialization must not be null" );
- observer.update(migrationVersion, "Starting migration");
+ this.migrationInfoSerialization = migrationInfoSerialization;
- //perform this migration, if it fails, short circuit
- try {
- dataMigration.migrate(entitiesObservable, observer).toBlocking().lastOrDefault(null);
- } catch (Throwable throwable) {
- observer.failed(migrationVersion, "Exception thrown during migration", throwable);
- LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
- throw new RuntimeException(throwable);
- }
- //we had an unhandled exception or the migration failed, short circuit
- if (observer.failed) {
- return;
- }
+ for ( MigrationPlugin plugin : plugins ) {
+ final String name = plugin.getName();
- //set the version
- migrationInfoSerialization.setVersion(migrationVersion);
- //update the observer for progress so other nodes can see it
- observer.update(migrationVersion, "Completed successfully");
- }
- });
+ final MigrationPlugin existing = migrationPlugins.get( name );
+ if ( existing != null ) {
+ throw new IllegalArgumentException( "Duplicate plugin name detected. A plugin with name " + name
+ + " is already implemented by class '" + existing.getClass().getName() + "'. Class '" + plugin
+ .getClass().getName() + "' is also trying to implement this name." );
+ }
- try {
- Observable
- .merge(applications, entities)
- .subscribeOn(Schedulers.io())
- .toBlocking().lastOrDefault(null);
- migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
- } catch (Exception e){
- LOG.error("Migration Failed",e);
+ migrationPlugins.put( name, plugin );
}
-
}
+ @Override
+ public void migrate() throws MigrationException {
- private boolean populateTreeMap() {
- if ( migrationTreeMap.isEmpty() ) {
- for ( DataMigration migration : dataMigrations ) {
-
- Preconditions.checkNotNull(migration,
- "A migration instance in the set of migrations was null. This is not allowed");
-
- final int version = migration.getVersion();
-
- final DataMigration existing = migrationTreeMap.get( version );
-
- if ( existing != null ) {
+ /**
+ * Invoke each plugin to attempt a migration
+ */
+ for(final MigrationPlugin plugin: migrationPlugins.values()){
+ final ProgressObserver observer = new CassandraProgressObserver(plugin.getName());
- final Class<? extends DataMigration> existingClass = existing.getClass();
+ plugin.run( observer );
+ }
- final Class<? extends DataMigration> currentClass = migration.getClass();
+ }
- throw new DataMigrationException( String.format(
- "Data migrations must be unique. Both classes %s and %s have version %d",
- existingClass, currentClass, version ) );
- }
- migrationTreeMap.put( version, migration );
- }
+ @Override
+ public boolean isRunning() {
+ for(final String pluginName :getPluginNames()){
+ if( migrationInfoSerialization.getStatusCode(pluginName) == StatusCode.RUNNING.status){
+ return true;
+ }
}
- if(migrationTreeMap.isEmpty()) {
- LOG.warn("No migrations found to run, exiting");
- return false;
- }
- return true;
+
+ return false;
}
@Override
- public boolean isRunning() {
- return migrationInfoSerialization.getStatusCode() == StatusCode.RUNNING.status;
+ public void invalidate() {
+ versionCache.invalidateAll();
}
@Override
- public void invalidate() {
- migrationInfoSerialization.invalidate();
+ public int getCurrentVersion( final String pluginName ) {
+ Preconditions.checkNotNull( pluginName, "pluginName cannot be null" );
+ return migrationInfoSerialization.getVersion( pluginName );
}
@Override
- public int getCurrentVersion() {
- return migrationInfoSerialization.getCurrentVersion();
- }
+ public void resetToVersion( final String pluginName, final int version ) {
+ Preconditions.checkNotNull( pluginName, "pluginName cannot be null" );
+ final MigrationPlugin plugin = migrationPlugins.get( pluginName );
+ Preconditions.checkArgument( plugin != null, "Plugin " + pluginName + " could not be found" );
- @Override
- public void resetToVersion( final int version ) {
- final int highestAllowed = migrationTreeMap.lastKey();
+ final int highestAllowed = plugin.getMaxVersion();
Preconditions.checkArgument( version <= highestAllowed,
- "You cannot set a version higher than the max of " + highestAllowed);
+ "You cannot set a version higher than the max of " + highestAllowed );
Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
- migrationInfoSerialization.setVersion( version );
+ migrationInfoSerialization.setVersion( pluginName, version );
}
- private int resetToHighestVersion( ) {
- final int highestAllowed = migrationTreeMap.lastKey();
- resetToVersion(highestAllowed);
- return highestAllowed;
+
+ @Override
+ public String getLastStatus( final String pluginName ) {
+ Preconditions.checkNotNull( pluginName, "pluginName cannot be null" );
+ return migrationInfoSerialization.getStatusMessage( pluginName );
}
+
@Override
- public String getLastStatus() {
- return migrationInfoSerialization.getStatusMessage();
+ public Set<String> getPluginNames() {
+ return migrationPlugins.keySet();
}
@@ -314,16 +180,20 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
}
- private final class CassandraProgressObserver implements DataMigration.ProgressObserver {
+ private final class CassandraProgressObserver implements ProgressObserver {
+
+ private final String pluginName;
private boolean failed = false;
+ private CassandraProgressObserver( final String pluginName ) {this.pluginName = pluginName;}
+
+
@Override
public void failed( final int migrationVersion, final String reason ) {
- final String storedMessage = String.format(
- "Failed to migrate, reason is appended. Error '%s'", reason );
+ final String storedMessage = String.format( "Failed to migrate, reason is appended. Error '%s'", reason );
update( migrationVersion, storedMessage );
@@ -332,7 +202,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
failed = true;
- migrationInfoSerialization.setStatusCode( StatusCode.ERROR.status );
+ migrationInfoSerialization.setStatusCode( pluginName, StatusCode.ERROR.status );
}
@@ -342,30 +212,28 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
throwable.printStackTrace( new PrintWriter( stackTrace ) );
- final String storedMessage = String.format(
- "Failed to migrate, reason is appended. Error '%s' %s", reason, stackTrace.toString() );
+ final String storedMessage = String.format( "Failed to migrate, reason is appended. Error '%s' %s", reason,
+ stackTrace.toString() );
update( migrationVersion, storedMessage );
- LOG.error( "Unable to migrate version {} due to reason {}.",
- migrationVersion, reason, throwable );
+ LOG.error( "Unable to migrate version {} due to reason {}.", migrationVersion, reason, throwable );
failed = true;
- migrationInfoSerialization.setStatusCode( StatusCode.ERROR.status );
+ migrationInfoSerialization.setStatusCode( pluginName, StatusCode.ERROR.status );
}
@Override
public void update( final int migrationVersion, final String message ) {
- final String formattedOutput = String.format(
- "Migration version %d. %s", migrationVersion, message );
+ final String formattedOutput = String.format( "Migration version %d. %s", migrationVersion, message );
//Print this to the info log
LOG.info( formattedOutput );
- migrationInfoSerialization.setStatusMessage( formattedOutput );
+ migrationInfoSerialization.setStatusMessage( pluginName, formattedOutput );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
index 3caa6aa..a972f8c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerialization.java
@@ -26,42 +26,48 @@ public interface MigrationInfoSerialization extends Migration {
/**
* Save the message to the cluster
+ * @param pluginName the name of hte plugin
* @param message
*/
- public void setStatusMessage( final String message );
+ public void setStatusMessage(final String pluginName, final String message );
/**
- * Get the last status
+ * Get the last status for the plugin
* @return
*/
- public String getStatusMessage();
+ public String getStatusMessage(final String pluginName);
/**
- * Save the version
+ * Save the version for the plugin
* @param version
*/
- public void setVersion(final int version);
+ public void setVersion(final String pluginName, final int version);
/**
* Return the version
* @return
*/
- public int getVersion();
+ public int getVersion(final String pluginName);
/**
* Set the status and save them
+ * @param pluginName The name of the plugin
* @param status
* @return
*/
- public void setStatusCode( final int status );
+ public void setStatusCode(final String pluginName, final int status );
/**
- *
+ * Get the status code for the plugin
* @return The integer that's saved
*/
- public int getStatusCode();
+ public int getStatusCode(final String pluginName);
- public int getCurrentVersion();
-
- public void invalidate();
+ /**
+ * This is deprecated, and will be used to migrate from the old version information to the new format.
+ * Should return -1 if not set
+ * @return
+ */
+ @Deprecated
+ public int getSystemVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index aeee0bb..f976568 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -50,18 +50,6 @@ import com.netflix.astyanax.serializers.StringSerializer;
public class MigrationInfoSerializationImpl implements MigrationInfoSerialization {
/**
- * Cache to cache versions temporarily
- */
- private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
- //cache the local value for 1 minute
- .expireAfterWrite(1, TimeUnit.MINUTES).build( new CacheLoader<String, Integer>() {
- @Override
- public Integer load( final String key ) throws Exception {
- return getVersion();
- }
- } );
-
- /**
* Just a hard coded scope since we need it
*/
private static final Id STATIC_ID =
@@ -78,7 +66,10 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
new MultiTennantColumnFamily<>( "Data_Migration_Info", ROW_KEY_SER, STRING_SERIALIZER );
- private static final ScopedRowKey<String> ROW_KEY = ScopedRowKey.fromKey( STATIC_ID, "" );
+ /**
+ * The row key we previously used to store versions. This is required to migrate versions in each module to the correct version
+ */
+ private static final ScopedRowKey<String> LEGACY_ROW_KEY = ScopedRowKey.fromKey( STATIC_ID, "" );
private static final String COL_STATUS_MESSAGE = "statusMessage";
@@ -96,10 +87,12 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
@Override
- public void setStatusMessage( final String message ) {
+ public void setStatusMessage(final String pluginName, final String message ) {
+
+ final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
try {
- keyspace.prepareColumnMutation( CF_MIGRATION_INFO, ROW_KEY, COL_STATUS_MESSAGE ).putValue( message, null )
+ keyspace.prepareColumnMutation( CF_MIGRATION_INFO, rowKey, COL_STATUS_MESSAGE ).putValue( message, null )
.execute();
}
catch ( ConnectionException e ) {
@@ -109,9 +102,12 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
@Override
- public String getStatusMessage() {
+ public String getStatusMessage(final String pluginName) {
+
+ final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
try {
- return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( ROW_KEY ).getColumn( COL_STATUS_MESSAGE )
+ return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( rowKey ).getColumn( COL_STATUS_MESSAGE )
.execute().getResult().getStringValue();
}
//swallow, it doesn't exist
@@ -125,28 +121,32 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
@Override
- public void setVersion( final int version ) {
+ public void setVersion(final String pluginName, final int version ) {
+
+ final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
try {
- keyspace.prepareColumnMutation( CF_MIGRATION_INFO, ROW_KEY, COLUMN_VERSION ).putValue( version, null )
+ keyspace.prepareColumnMutation( CF_MIGRATION_INFO, rowKey, COLUMN_VERSION ).putValue( version, null )
.execute();
}
catch ( ConnectionException e ) {
throw new DataMigrationException( "Unable to save status", e );
}
-
- versionCache.invalidateAll();
}
@Override
- public int getVersion() {
+ public int getVersion(final String pluginName) {
+
+ final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
try {
- return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( ROW_KEY ).getColumn( COLUMN_VERSION ).execute()
+ return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( rowKey ).getColumn( COLUMN_VERSION ).execute()
.getResult().getIntegerValue();
}
//swallow, it doesn't exist
catch ( NotFoundException nfe ) {
- return 0;
+ return -1;
}
catch ( ConnectionException e ) {
throw new DataMigrationException( "Unable to retrieve status", e );
@@ -155,9 +155,12 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
@Override
- public void setStatusCode( final int status ) {
+ public void setStatusCode(final String pluginName, final int status ) {
+
+ final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
try {
- keyspace.prepareColumnMutation( CF_MIGRATION_INFO, ROW_KEY, COLUMN_STATUS_CODE ).putValue( status, null )
+ keyspace.prepareColumnMutation( CF_MIGRATION_INFO, rowKey, COLUMN_STATUS_CODE ).putValue( status, null )
.execute();
}
catch ( ConnectionException e ) {
@@ -167,32 +170,37 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
@Override
- public int getStatusCode() {
+ public int getStatusCode(final String pluginName) {
+
+ final ScopedRowKey<String> rowKey = ScopedRowKey.fromKey( STATIC_ID, pluginName);
+
try {
- return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( ROW_KEY ).getColumn( COLUMN_STATUS_CODE )
+ return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( rowKey ).getColumn( COLUMN_STATUS_CODE )
.execute().getResult().getIntegerValue();
}
//swallow, it doesn't exist
catch ( NotFoundException nfe ) {
- return 0;
+ return -1;
}
catch ( ConnectionException e ) {
throw new DataMigrationException( "Unable to retrieve status", e );
}
}
+
@Override
- public int getCurrentVersion() {
+ public int getSystemVersion() {
try {
- return versionCache.get("currentversion");
- }catch (Exception ee){
- throw new RuntimeException(ee);
+ return keyspace.prepareQuery( CF_MIGRATION_INFO ).getKey( LEGACY_ROW_KEY ).getColumn( COLUMN_VERSION )
+ .execute().getResult().getIntegerValue();
+ }
+ //swallow, it doesn't exist
+ catch ( NotFoundException nfe ) {
+ return -1;
+ }
+ catch ( ConnectionException e ) {
+ throw new DataMigrationException( "Unable to retrieve status", e );
}
- }
-
- @Override
- public void invalidate() {
- versionCache.invalidateAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
index f660c02..5830335 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/DataMigration2.java
@@ -41,31 +41,7 @@ public interface DataMigration2<T> {
* Get the version of this migration. It should be unique within the scope of the plugin
* @return
*/
- public int version();
+ public int getVersion();
- public interface ProgressObserver{
- /**
- * Mark the migration as failed
- * @param migrationVersion The migration version running during the failure
- * @param reason The reason to save
- */
- public void failed(final int migrationVersion, final String reason);
-
- /**
- * Mark the migration as failed with a stack trace
- * @param migrationVersion The migration version running during the failure
- * @param reason The error description to save
- * @param throwable The error that happened
- */
- public void failed(final int migrationVersion, final String reason, final Throwable throwable);
-
-
- /**
- * Update the status of the migration with the message
- *
- * @param message The message to save for the status
- */
- public void update(final int migrationVersion, final String message);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
index f04a035..880cfd1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationPlugin.java
@@ -24,8 +24,6 @@
package org.apache.usergrid.persistence.core.migration.data.newimpls;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-
/**
* A simple interface to return migration plugins. All versions within this migration plugin should have a name
@@ -34,7 +32,7 @@ public interface MigrationPlugin {
/**
- * Get the name of the plugin
+ * Get the name of the plugin. Must be unique
* @return
*/
public String getName();
@@ -42,6 +40,12 @@ public interface MigrationPlugin {
/**
* Run any migrations that may need to be run in this plugin
*/
- public void run(DataMigration.ProgressObserver observer);
+ public void run(ProgressObserver observer);
+
+ /**
+ * Get the maximum migration version this plugin implements
+ * @return
+ */
+ public int getMaxVersion();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java
new file mode 100644
index 0000000..a9ad187
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/MigrationRelationship.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+/**
+ * Simple relationship that defines the current state of the source and destination data versions
+ */
+public class MigrationRelationship<T extends VersionedData> {
+
+ //public so it's FAST. It's also immutable
+ public final T from;
+ public final T to;
+
+
+ public MigrationRelationship( T from, T to ) {
+ this.from = from;
+ this.to = to;
+ }
+}