You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/04 18:28:39 UTC
[4/4] incubator-usergrid git commit: Pushed migration tests down to
core tiers. WIP
Pushed migration tests down to core tiers. WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/802dcdec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/802dcdec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/802dcdec
Branch: refs/heads/USERGRID-405
Commit: 802dcdec4405f4a358cef7d253fa5695d64769cc
Parents: 7925a4b
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Mar 3 17:34:21 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 4 10:28:10 2015 -0700
----------------------------------------------------------------------
.../AllApplicationsObservable.java | 39 +++
.../usergrid/corepersistence/CoreModule.java | 80 ++++--
.../corepersistence/CpEntityManagerFactory.java | 60 +++--
.../events/EntityVersionDeletedHandler.java | 17 +-
.../migration/CoreDataVersions.java | 44 +++
.../migration/CoreMigrationPlugin.java | 59 ++++
.../migration/EntityTypeMappingMigration.java | 90 +++----
.../migration/MigrationModuleVersion.java | 112 ++++++++
.../corepersistence/migration/Versions.java | 24 --
.../rx/impl/AbstractGraphVisitorImpl.java | 110 ++++++++
.../rx/impl/AllApplicationsObservableImpl.java | 142 ++++++++++
.../rx/impl/AllEntitiesInSystemImpl.java | 62 +++++
.../impl/AllEntitiesInSystemObservableImpl.java | 127 ---------
.../rx/impl/AllNodesInGraphImpl.java | 55 ++++
.../rx/impl/ApplicationObservableImpl.java | 149 ----------
.../migration/EntityDataMigrationIT.java | 270 -------------------
.../migration/EntityTypeMappingMigrationIT.java | 1 -
.../migration/GraphShardVersionMigrationIT.java | 235 ----------------
.../migration/MigrationTestRule.java | 9 +-
.../migration/TestProgressObserver.java | 71 -----
.../rx/AllEntitiesInSystemObservableIT.java | 45 +---
.../rx/ApplicationObservableTestIT.java | 16 +-
.../persistence/collection/MvccLogEntry.java | 1 +
.../collection/guice/CollectionModule.java | 31 +--
.../persistence/collection/guice/Write.java | 17 --
.../collection/guice/WriteUpdate.java | 17 --
.../EntityCollectionManagerFactoryImpl.java | 9 +-
.../impl/EntityCollectionManagerImpl.java | 21 +-
.../collection/mvcc/stage/write/WriteStart.java | 6 +-
.../migration/MvccEntityDataMigrationImpl.java | 50 ----
.../mvcc/stage/write/WriteStartTest.java | 6 +-
.../MvccEntityDataMigrationV1ToV3ImplTest.java | 179 ++++++++++++
.../core/guice/DataMigrationResetRule.java | 88 ++++++
.../migration/data/TestProgressObserver.java | 71 +++++
.../persistence/core/util/IdGenerator.java | 51 ++++
.../persistence/graph/guice/GraphModule.java | 7 +-
.../impl/EdgeDataMigrationImpl.java | 152 -----------
.../impl/migration/EdgeDataMigrationImpl.java | 149 ++++++++++
.../impl/migration/GraphMigrationPlugin.java | 7 +-
.../serialization/impl/migration/GraphNode.java | 39 +++
.../persistence/graph/GraphManagerIT.java | 21 +-
.../persistence/graph/GraphManagerLoadTest.java | 13 +-
.../graph/GraphManagerShardConsistencyIT.java | 10 +-
.../graph/GraphManagerShardingIT.java | 13 +-
.../graph/GraphManagerStressTest.java | 11 +-
.../graph/guice/TestGraphModule.java | 5 +-
.../graph/impl/EdgeDeleteListenerTest.java | 11 +-
.../graph/impl/NodeDeleteListenerTest.java | 9 +-
.../graph/impl/stage/EdgeDeleteRepairTest.java | 7 +-
.../graph/impl/stage/EdgeMetaRepairTest.java | 29 +-
.../EdgeMetadataSerializationTest.java | 39 +--
.../EdgeSerializationChopTest.java | 7 +-
.../serialization/EdgeSerializationTest.java | 21 +-
.../serialization/NodeSerializationTest.java | 15 +-
.../migration/EdgeDataMigrationImplTest.java | 177 ++++++++++++
.../impl/shard/EdgeShardSerializationTest.java | 7 +-
.../impl/shard/NodeShardAllocationTest.java | 27 +-
.../impl/shard/NodeShardCacheTest.java | 7 +-
.../impl/shard/ShardGroupCompactionTest.java | 7 +-
.../shard/count/NodeShardApproximationTest.java | 9 +-
.../NodeShardCounterSerializationTest.java | 6 +-
.../shard/impl/ShardEntryGroupIteratorTest.java | 15 +-
...rceDirectedEdgeDescendingComparatorTest.java | 23 +-
...getDirectedEdgeDescendingComparatorTest.java | 23 +-
.../graph/test/util/EdgeTestUtils.java | 29 +-
65 files changed, 1767 insertions(+), 1492 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
new file mode 100644
index 0000000..1187cf2
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
+
+import rx.Observable;
+
+
+/**
+ * Interface for generating an observable of all ApplicationScope
+ */
+public interface AllApplicationsObservable {
+
+ /**
+ * Return all applications in our system
+ * @return
+ */
+ public Observable<ApplicationScope> getAllApplications();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index d7d7f2f..9f01feb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -15,33 +15,38 @@
*/
package org.apache.usergrid.corepersistence;
-import com.google.inject.AbstractModule;
-import com.google.inject.Provider;
-import com.google.inject.multibindings.Multibinder;
-import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
+import org.springframework.context.ApplicationContext;
+
import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
-import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservableImpl;
+import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
+import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl;
+import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.MvccEntityDataMigrationImpl;
import org.apache.usergrid.persistence.core.guice.CommonModule;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationPlugin;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
import org.apache.usergrid.persistence.index.guice.IndexModule;
import org.apache.usergrid.persistence.map.guice.MapModule;
import org.apache.usergrid.persistence.queue.guice.QueueModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContext;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provider;
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.Multibinder;
/**
@@ -70,19 +75,34 @@ public class CoreModule extends AbstractModule {
bind(EntityManagerFactory.class).toProvider( lazyEntityManagerFactoryProvider );
install( new CommonModule());
- install(new CollectionModule());
- install(new GraphModule());
+ install( new CollectionModule() {
+ /**
+ * configure our migration data provider for all entities in the system
+ */
+ @Override
+ public void configureMigrationProvider() {
+
+ bind(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){}).to(
+ AllEntitiesInSystemImpl.class );
+ }
+ } );
+ install( new GraphModule() {
+
+ /**
+ * Override the observable that needs to be used for migration
+ */
+ @Override
+ public void configureMigrationProvider() {
+ bind( new TypeLiteral<MigrationDataProvider<GraphNode>>() {} ).to(
+ AllNodesInGraphImpl.class );
+ }
+ } );
install(new IndexModule());
install(new MapModule());
install(new QueueModule());
bind(ManagerCache.class).to( CpManagerCache.class );
- bind(AllEntitiesInSystemObservable.class).to( AllEntitiesInSystemObservableImpl.class );
- bind(ApplicationObservable.class).to( ApplicationObservableImpl.class );
- Multibinder<DataMigration> dataMigrationMultibinder =
- Multibinder.newSetBinder( binder(), DataMigration.class );
- dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
Multibinder<EntityDeleted> entityBinder =
Multibinder.newSetBinder(binder(), EntityDeleted.class);
@@ -97,6 +117,26 @@ public class CoreModule extends AbstractModule {
versionCreatedMultibinder.addBinding().to(EntityVersionCreatedHandler.class);
+ /**
+ * Create our migrations for within our core plugin
+ *
+ */
+ Multibinder<DataMigration2<EntityIdScope>> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration2<EntityIdScope>>() {} );
+
+
+ dataMigrationMultibinder.addBinding().to( MvccEntityDataMigrationImpl.class );
+ dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
+
+
+ //wire up the collection migration plugin
+ Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to( CoreMigrationPlugin.class );
+
+ bind(AllApplicationsObservable.class).to(AllApplicationsObservableImpl.class);
+
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 61f32c5..9d0b03a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -15,12 +15,6 @@
*/
package org.apache.usergrid.corepersistence;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Injector;
-import com.yammer.metrics.annotation.Metered;
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
import java.util.Arrays;
import java.util.HashMap;
@@ -33,6 +27,12 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -43,16 +43,15 @@ import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Results;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import org.apache.usergrid.persistence.cassandra.CassandraService;
import org.apache.usergrid.persistence.cassandra.CounterUtils;
import org.apache.usergrid.persistence.cassandra.Setup;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.util.Health;
@@ -68,15 +67,24 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.utils.UUIDUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.BeansException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import com.yammer.metrics.annotation.Metered;
+
import rx.Observable;
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+
/**
* Implement good-old Usergrid EntityManagerFactory with the new-fangled Core Persistence API.
@@ -107,7 +115,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private ManagerCache managerCache;
- private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
private DataMigrationManager dataMigrationManager;
@@ -171,10 +178,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
return managerCache;
}
- private AllEntitiesInSystemObservable getAllEntitiesObservable(){
- if(allEntitiesInSystemObservable==null)
- allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
- return allEntitiesInSystemObservable;
+ private Observable<EntityIdScope> getAllEntitiesObservable(){
+ return injector.getInstance( Key.get(new TypeLiteral< MigrationDataProvider<EntityIdScope>>(){})).getData();
}
@@ -601,7 +606,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public long performEntityCount() {
//TODO, this really needs to be a task that writes this data somewhere since this will get
//progressively slower as the system expands
- return (Long) getAllEntitiesObservable().getAllEntitiesInSystem(1000).longCount().toBlocking().last();
+ return (Long) getAllEntitiesObservable().longCount().toBlocking().last();
}
@@ -720,20 +725,25 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
@Override
public String getMigrateDataStatus() {
- return dataMigrationManager.getLastStatus();
+ //TODO USERGRID-405 return dataMigrationManager.getLastStatus();
+ return null;
}
@Override
public int getMigrateDataVersion() {
- return dataMigrationManager.getCurrentVersion();
+ //TODO USERGRID-405
+ //return dataMigrationManager.getCurrentVersion();
+ return 0;
}
@Override
public void setMigrationVersion( final int version ) {
- dataMigrationManager.resetToVersion( version );
- dataMigrationManager.invalidate();
+ //TODO USERGRID-405
+//
+// dataMigrationManager.resetToVersion( version );
+// dataMigrationManager.invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 127ae46..65dacee 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,6 +24,7 @@ import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.index.EntityIndex;
@@ -53,9 +54,11 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
private EntityManagerFactory emf;
+
@Override
- public void versionDeleted(
- final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
+ public void versionDeleted( final CollectionScope scope, final Id entityId,
+ final List<MvccLogEntry> entityVersions ) {
+
// This check is for testing purposes and for a test that to be able to dynamically turn
// off and on delete previous versions so that it can test clean-up on read.
@@ -84,13 +87,12 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
scope.getName()
);
- rx.Observable.from(entityVersions)
- .subscribeOn(Schedulers.io())
+ rx.Observable.from( entityVersions )
.buffer(serializationFig.getBufferSize())
- .map(new Func1<List<MvccEntity>, List<MvccEntity>>() {
+ .map(new Func1<List<MvccLogEntry>, List<MvccLogEntry>>() {
@Override
- public List<MvccEntity> call(List<MvccEntity> entityList) {
- for (MvccEntity entity : entityList) {
+ public List<MvccLogEntry> call(List<MvccLogEntry> entityList) {
+ for (MvccLogEntry entity : entityList) {
eibatch.deindex(indexScope, entityId, entity.getVersion());
}
eibatch.execute();
@@ -99,4 +101,5 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
}).toBlocking().last();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java
new file mode 100644
index 0000000..58ba6a3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreDataVersions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+/**
+ * Versions of data as they exist across our system
+ */
+public enum CoreDataVersions {
+ //even though this didn't really come first in time, we need to run this first in order to bring our system
+ //up to date so that our new migration module can proceed.
+
+ INITIAL(0),
+ MIGRATION_VERSION_FIX(1),
+ ID_MAP_FIX(2);
+
+
+ private final int version;
+
+
+ private CoreDataVersions( final int version ) {this.version = version;}
+
+
+ public int getVersion() {
+ return version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java
new file mode 100644
index 0000000..b95900f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.Set;
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.AbstractMigrationPlugin;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Migration plugin for the collection module
+ */
+@Singleton
+public class CoreMigrationPlugin extends AbstractMigrationPlugin<EntityIdScope> {
+
+ public static final String PLUGIN_NAME = "core-data";
+
+
+
+ @Inject
+ public CoreMigrationPlugin( final Set<DataMigration2<EntityIdScope>> entityDataMigrations,
+ final MigrationDataProvider<EntityIdScope> entityIdScopeDataMigrationProvider,
+ final MigrationInfoSerialization migrationInfoSerialization ) {
+ super( entityDataMigrations, entityIdScopeDataMigrationProvider, migrationInfoSerialization );
+ }
+
+
+ @Override
+ public String getName() {
+ return PLUGIN_NAME;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 4dbc373..80d7ebe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -23,88 +23,88 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.migration.data.CollectionDataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Action1;
import rx.functions.Func1;
-import rx.schedulers.Schedulers;
/**
* Migration to ensure that our entity id is written into our map data
*/
-public class EntityTypeMappingMigration implements CollectionDataMigration {
+public class EntityTypeMappingMigration implements DataMigration2<EntityIdScope> {
private final ManagerCache managerCache;
- private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+ private final MigrationDataProvider<EntityIdScope> allEntitiesInSystemObservable;
@Inject
- public EntityTypeMappingMigration( final ManagerCache managerCache, final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
- this.managerCache = managerCache;
+ public EntityTypeMappingMigration( final ManagerCache managerCache,
+ final MigrationDataProvider<EntityIdScope> allEntitiesInSystemObservable ) {
+ this.managerCache = managerCache;
this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
}
@Override
- public Observable migrate(final Observable<ApplicationEntityGroup> applicationEntityGroupObservable, final ProgressObserver observer) throws Throwable {
+ public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
+ final ProgressObserver observer ) {
final AtomicLong atomicLong = new AtomicLong();
- return applicationEntityGroupObservable.flatMap(new Func1<ApplicationEntityGroup, Observable<Long>>() {
- @Override
- public Observable call(final ApplicationEntityGroup applicationEntityGroup) {
- final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
-
- final MapManager mapManager = managerCache.getMapManager(ms);
- return Observable.from(applicationEntityGroup.entityIds)
- .subscribeOn(Schedulers.io())
- .map(new Func1<EntityIdScope, Long>() {
- @Override
- public Long call(EntityIdScope idScope) {
- final UUID entityUuid = idScope.getId().getUuid();
- final String entityType = idScope.getId().getType();
-
- mapManager.putString(entityUuid.toString(), entityType);
-
- if (atomicLong.incrementAndGet() % 100 == 0) {
- updateStatus(atomicLong, observer);
- }
- return atomicLong.get();
- }
- });
- }
- });
- }
+ return allEntitiesInSystemObservable.getData()
+ //process the entities in parallel
+ .parallel( new Func1<Observable<EntityIdScope>, Observable<EntityIdScope>>() {
+
+
+ @Override
+ public Observable<EntityIdScope> call( final Observable<EntityIdScope> entityIdScopeObservable ) {
- private void updateStatus( final AtomicLong counter, final ProgressObserver observer ) {
+ //for each entity observable, get the map scope and write it to the map
+ return entityIdScopeObservable.doOnNext( new Action1<EntityIdScope>() {
+ @Override
+ public void call( final EntityIdScope entityIdScope ) {
+ final MapScope ms = CpNamingUtils
+ .getEntityTypeMapScope( entityIdScope.getCollectionScope().getApplication() );
- observer.update( getVersion(), String.format( "Updated %d entities", counter.get() ) );
+ final MapManager mapManager = managerCache.getMapManager( ms );
+
+ final UUID entityUuid = entityIdScope.getId().getUuid();
+ final String entityType = entityIdScope.getId().getType();
+
+ mapManager.putString( entityUuid.toString(), entityType );
+
+ if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+ observer.update( getMaxVersion(),
+ String.format( "Updated %d entities", atomicLong.get() ) );
+ }
+ }
+ } );
+ }
+ } ).count().toBlocking().last();
}
@Override
- public int getVersion() {
- return Versions.VERSION_1;
+ public boolean supports( final int currentVersion ) {
+ //we move from the migration version fix to the current version
+ return CoreDataVersions.MIGRATION_VERSION_FIX.getVersion() == currentVersion;
}
+
@Override
- public MigrationType getType() {
- return MigrationType.Entities;
+ public int getMaxVersion() {
+ return CoreDataVersions.ID_MAP_FIX.getVersion();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java
new file mode 100644
index 0000000..1c124ac
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/MigrationModuleVersion.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV2Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigrationPlugin;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.ProgressObserver;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationV2Impl;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
+
+import com.google.inject.Inject;
+
+
+/**
+ * Migration to set our module versions now that we've refactor for sub modules Keeps the EntityIdScope because it won't
+ * subscribe to the data provider.
+ */
+public class MigrationModuleVersion implements DataMigration2<EntityIdScope> {
+
+
+ /**
+ * The migration from 0 -> 1 that re-writes all the entity id's into the map module
+ */
+ private static final int ID_MIGRATION = 1;
+
+ /**
+ * The migration from 1-> 2 that shards our edge meta data
+ */
+ private static final int EDGE_SHARD_MIGRATION = 2;
+
+ /**
+ * The migration from 2-> 3 that fixed the short truncation bug
+ */
+ private static final int ENTITY_V2_MIGRATION = 3;
+
+ private final MigrationInfoSerialization migrationInfoSerialization;
+
+ private final MvccEntitySerializationStrategyV2Impl serializationStrategyV2;
+
+ private final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2;
+
+
+ @Inject
+ public MigrationModuleVersion( final MigrationInfoSerialization migrationInfoSerialization,
+ final MvccEntitySerializationStrategyV2Impl serializationStrategyV2,
+ final EdgeMetadataSerializationV2Impl edgeMetadataSerializationV2 ) {
+ this.migrationInfoSerialization = migrationInfoSerialization;
+ this.serializationStrategyV2 = serializationStrategyV2;
+ this.edgeMetadataSerializationV2 = edgeMetadataSerializationV2;
+ }
+
+
+ @Override
+ public int migrate( final int currentVersion, final MigrationDataProvider<EntityIdScope> migrationDataProvider,
+ final ProgressObserver observer ) {
+
+ //we ignore our current version, since it will always be 0
+ final int legacyVersion = migrationInfoSerialization.getSystemVersion();
+
+ //now we store versions for each of our modules
+
+ switch ( legacyVersion ) {
+ case ID_MIGRATION:
+ //no op, we need to migration everything in all modules
+ break;
+ //we need to set the version of the entity data, and our edge shard migration. The fall through (no break) is deliberate
+ case ENTITY_V2_MIGRATION:
+ migrationInfoSerialization.setVersion( CollectionMigrationPlugin.PLUGIN_NAME, serializationStrategyV2.getImplementationVersion() );
+ case EDGE_SHARD_MIGRATION:
+ //set our shard migration to the migrated version
+ migrationInfoSerialization.setVersion( GraphMigrationPlugin.PLUGIN_NAME, edgeMetadataSerializationV2.getImplementationVersion() );
+ break;
+ }
+
+ return CoreDataVersions.MIGRATION_VERSION_FIX.getVersion();
+ }
+
+
+ @Override
+ public boolean supports( final int currentVersion ) {
+ //we move from the migration version fix to the current version
+ return CoreDataVersions.INITIAL.getVersion() == currentVersion;
+ }
+
+
+ @Override
+ public int getMaxVersion() {
+ return CoreDataVersions.MIGRATION_VERSION_FIX.getVersion();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index 7d05733..d921353 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -22,34 +22,10 @@
package org.apache.usergrid.corepersistence.migration;
-import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.core.guice.V3Impl;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
-
-
/**
* Simple class to hold the constants of all versions
*/
public class Versions {
- /**
- * Version 1 of our mappings
- */
- public static final int VERSION_1 = 1;
-
- /**
- * Version 2. Edge meta changes
- */
- public static final int VERSION_2 = EdgeMigrationStrategy.MIGRATION_VERSION;
-
- /**
- * Version 3. migrate from entity serialization 1 -> 2
- */
- public static final int VERSION_3 = V2Impl.MIGRATION_VERSION;
-
- /**
- * Version 4. migrate from entity serialization 1 -> 2
- */
- public static final int VERSION_4 = V3Impl.MIGRATION_VERSION;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java
new file mode 100644
index 0000000..07b7a3d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AbstractGraphVisitorImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public abstract class AbstractGraphVisitorImpl<T> implements MigrationDataProvider<T> {
+
+ private final AllApplicationsObservable applicationObservable;
+ private final GraphManagerFactory graphManagerFactory;
+ private final TargetIdObservable targetIdObservable;
+
+ @Inject
+ public AbstractGraphVisitorImpl( AllApplicationsObservable applicationObservable,
+ GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable ) {
+
+ this.applicationObservable = applicationObservable;
+ this.graphManagerFactory = graphManagerFactory;
+ this.targetIdObservable = targetIdObservable;
+ }
+
+
+
+ @Override
+ public Observable<T> getData() {
+ return applicationObservable.getAllApplications().flatMap( new Func1<ApplicationScope, Observable<T>>() {
+ @Override
+ public Observable<T> call( final ApplicationScope applicationScope ) {
+ return getAllEntities( applicationScope );
+ }
+ } );
+
+ }
+
+
+ private Observable<T> getAllEntities(final ApplicationScope applicationScope) {
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+ final Id applicationId = applicationScope.getApplication();
+
+ //load all nodes that are targets of our application node. I.E.
+ // entities that have been saved
+ final Observable<Id> entityNodes =
+ targetIdObservable.getTargetNodes(gm, applicationId);
+
+ //emit Scope + ID
+
+ //create our application node to emit since it's an entity as well
+ final Observable<Id> applicationNode = Observable.just(applicationId);
+
+ //merge both the specified application node and the entity node
+ // so they all get used
+ return Observable
+ .merge( applicationNode, entityNodes ).
+ map( new Func1<Id, T>() {
+ @Override
+ public T call( final Id id ) {
+ return generateData(applicationScope, id);
+ }
+ } );
+ }
+
+
+ /**
+ * Generate the data for the observable stream from the scope and the node id
+ * @param applicationScope
+ * @param nodeId
+ * @return
+ */
+ protected abstract T generateData(final ApplicationScope applicationScope, final Id nodeId);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
new file mode 100644
index 0000000..0fc5452
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
+
+
+/**
+ * An observable that will emit all application stored in the system.
+ */
+public class AllApplicationsObservableImpl implements AllApplicationsObservable {
+
+ private static final Logger logger = LoggerFactory.getLogger( AllApplicationsObservableImpl.class );
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
+
+ @Inject
+ public AllApplicationsObservableImpl( EntityCollectionManagerFactory entityCollectionManagerFactory,
+ GraphManagerFactory graphManagerFactory ){
+
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
+ }
+
+
+
+ @Override
+ public Observable<ApplicationScope> getAllApplications() {
+
+ //emit our 3 hard coded applications that are used the manage the system first.
+ //this way consumers can perform whatever work they need to on the root system first
+ final Observable<ApplicationScope> systemIds = Observable.from( Arrays
+ .asList( getApplicationScope( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+ getApplicationScope( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+ getApplicationScope( CpNamingUtils.SYSTEM_APP_ID ) ) );
+
+
+ final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
+
+ final CollectionScope appInfoCollectionScope =
+ new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
+
+ final EntityCollectionManager collectionManager =
+ entityCollectionManagerFactory.createCollectionManager( appInfoCollectionScope );
+
+
+ final GraphManager gm = graphManagerFactory.createEdgeManager(appScope);
+
+
+ String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
+
+ Id rootAppId = appScope.getApplication();
+
+
+ //we have app infos. For each of these app infos, we have to load the application itself
+ Observable<ApplicationScope> appIds = gm.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ null ) ).flatMap( new Func1<Edge, Observable<ApplicationScope>>() {
+ @Override
+ public Observable<ApplicationScope> call( final Edge edge ) {
+ //get the app info and load it
+ final Id appInfo = edge.getTargetNode();
+
+ return collectionManager.load( appInfo )
+ //filter out null entities
+ .filter( new Func1<Entity, Boolean>() {
+ @Override
+ public Boolean call( final Entity entity ) {
+ if ( entity == null ) {
+ logger.warn( "Encountered a null application info for id {}", appInfo );
+ return false;
+ }
+
+ return true;
+ }
+ } )
+ //get the id from the entity
+ .map( new Func1<Entity, ApplicationScope>() {
+
+
+ @Override
+ public ApplicationScope call( final Entity entity ) {
+
+ final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+ return getApplicationScope( uuid );
+ }
+ } );
+ }
+ } );
+
+ return Observable.merge( systemIds, appIds );
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java
new file mode 100644
index 0000000..758a8aa
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemImpl.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+@Singleton
+public class AllEntitiesInSystemImpl extends AbstractGraphVisitorImpl<EntityIdScope> {
+
+
+ @Inject
+ public AllEntitiesInSystemImpl( final AllApplicationsObservable applicationObservable,
+ final GraphManagerFactory graphManagerFactory,
+ final TargetIdObservable targetIdObservable ) {
+ super( applicationObservable, graphManagerFactory, targetIdObservable );
+ }
+
+
+ @Override
+ protected EntityIdScope generateData( final ApplicationScope applicationScope, final Id nodeId ) {
+ CollectionScope scope =
+ CpNamingUtils.getCollectionScopeNameFromEntityType( applicationScope.getApplication(), nodeId.getType() );
+
+ final EntityIdScope idScope = new EntityIdScope( scope, nodeId );
+
+ return idScope;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
deleted file mode 100644
index 55667cb..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * An observable that will emit every entity Id stored in our entire system across all apps.
- * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
- * source node
- */
-public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObservable<CollectionScope> {
-
- private final ApplicationObservable applicationObservable;
- private final GraphManagerFactory graphManagerFactory;
- private final TargetIdObservable targetIdObservable;
-
- @Inject
- public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable) {
-
- this.applicationObservable = applicationObservable;
- this.graphManagerFactory = graphManagerFactory;
- this.targetIdObservable = targetIdObservable;
- }
-
- public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final int bufferSize) {
- return getAllEntitiesInSystem(applicationObservable.getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
- @Override
- public ApplicationScope call(Id id) {
- return new ApplicationScopeImpl(id);
- }
- }), bufferSize);
-
- }
-
- public Observable<ApplicationEntityGroup<CollectionScope>> getAllEntitiesInSystem(final Observable<ApplicationScope> appIdsObservable, final int bufferSize) {
- //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
- return appIdsObservable
- .flatMap(new Func1<ApplicationScope, Observable<ApplicationEntityGroup<CollectionScope>>>() {
- @Override
- public Observable<ApplicationEntityGroup<CollectionScope>> call(final ApplicationScope applicationScope) {
- return getAllEntities(applicationScope, bufferSize);
- }
- });
- }
-
- private Observable<ApplicationEntityGroup<CollectionScope>> getAllEntities(final ApplicationScope applicationScope, final int bufferSize) {
- final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
- final Id applicationId = applicationScope.getApplication();
-
- //load all nodes that are targets of our application node. I.E.
- // entities that have been saved
- final Observable<Id> entityNodes =
- targetIdObservable.getTargetNodes(gm, applicationId);
-
-
- //get scope here
-
-
- //emit Scope + ID
-
- //create our application node to emit since it's an entity as well
- final Observable<Id> applicationNode = Observable.just(applicationId);
-
- //merge both the specified application node and the entity node
- // so they all get used
- return Observable
- .merge(applicationNode, entityNodes)
- .buffer(bufferSize)
- .map(new Func1<List<Id>, List<EntityIdScope<CollectionScope>>>() {
- @Override
- public List<EntityIdScope<CollectionScope>> call(List<Id> ids) {
- List<EntityIdScope<CollectionScope>> scopes = new ArrayList<>(ids.size());
- for (Id id : ids) {
- CollectionScope scope = CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId, id.getType());
- EntityIdScope<CollectionScope> idScope = new EntityIdScope<>(id, scope);
- scopes.add(idScope);
- }
- return scopes;
- }
- })
- .map(new Func1<List<EntityIdScope<CollectionScope>>, ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public ApplicationEntityGroup<CollectionScope> call(final List<EntityIdScope<CollectionScope>> scopes) {
- return new ApplicationEntityGroup<>(applicationScope, scopes);
- }
- });
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java
new file mode 100644
index 0000000..19292b8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllNodesInGraphImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import org.apache.usergrid.corepersistence.AllApplicationsObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * An observable that will emit every entity's node Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+@Singleton
+public class AllNodesInGraphImpl extends AbstractGraphVisitorImpl<GraphNode> {
+
+
+ @Inject
+ public AllNodesInGraphImpl( final AllApplicationsObservable applicationObservable,
+ final GraphManagerFactory graphManagerFactory,
+ final TargetIdObservable targetIdObservable ) {
+ super( applicationObservable, graphManagerFactory, targetIdObservable );
+ }
+
+
+ @Override
+ protected GraphNode generateData( final ApplicationScope applicationScope, final Id nodeId ) {
+ return new GraphNode( applicationScope, nodeId );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
deleted file mode 100644
index f4a1057..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx.impl;
-
-
-import java.util.Arrays;
-import java.util.UUID;
-
-import com.google.inject.Inject;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateApplicationId;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
-
-/**
- * An observable that will emit all application stored in the system.
- */
-public class ApplicationObservableImpl implements ApplicationObservable {
-
- private static final Logger logger = LoggerFactory.getLogger( ApplicationObservableImpl.class );
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final GraphManagerFactory graphManagerFactory;
-
- @Inject
- public ApplicationObservableImpl(EntityCollectionManagerFactory entityCollectionManagerFactory, GraphManagerFactory graphManagerFactory){
-
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.graphManagerFactory = graphManagerFactory;
- }
-
-
- @Override
- public Observable<Id> getAllApplicationIds() {
-
- //emit our 3 hard coded applications that are used the manage the system first.
- //this way consumers can perform whatever work they need to on the root system first
- final Observable<Id> systemIds = Observable.from( Arrays
- .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
- final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
-
- final CollectionScope appInfoCollectionScope =
- new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
-
- final EntityCollectionManager collectionManager =
- entityCollectionManagerFactory.createCollectionManager( appInfoCollectionScope );
-
-
- final GraphManager gm = graphManagerFactory.createEdgeManager(appScope);
-
-
- String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
-
- Id rootAppId = appScope.getApplication();
-
-
- //we have app infos. For each of these app infos, we have to load the application itself
- Observable<Id> appIds = gm.loadEdgesFromSource(
- new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
- @Override
- public Observable<Id> call( final Edge edge ) {
- //get the app info and load it
- final Id appInfo = edge.getTargetNode();
-
- return collectionManager.load( appInfo )
- //filter out null entities
- .filter( new Func1<Entity, Boolean>() {
- @Override
- public Boolean call( final Entity entity ) {
- if ( entity == null ) {
- logger.warn( "Encountered a null application info for id {}", appInfo );
- return false;
- }
-
- return true;
- }
- } )
- //get the id from the entity
- .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
-
-
- @Override
- public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
-
- final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
-
- return CpNamingUtils.generateApplicationId( uuid );
- }
- } );
- }
- } );
-
- return Observable.merge( systemIds, appIds );
- }
-
- @Override
- public Observable<ApplicationScope> getAllApplicationScopes() {
- return getAllApplicationIds().map(new Func1<Id, ApplicationScope>() {
- @Override
- public ApplicationScope call(Id id) {
- ApplicationScope scope = new ApplicationScopeImpl(id);
- return scope;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
deleted file mode 100644
index 4d87f8b..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntityDataMigrationImpl;
-import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
-import org.apache.usergrid.persistence.core.migration.data.*;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.EntityIdScope;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-
-import org.apache.usergrid.AbstractCoreIT;
-import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.EntityWriteHelper;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Injector;
-import com.google.inject.Key;
-
-import net.jcip.annotations.NotThreadSafe;
-
-import rx.functions.Action1;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
-@NotThreadSafe
-public class EntityDataMigrationIT extends AbstractCoreIT {
-
-
- private Injector injector;
-
-
- private CollectionDataMigration entityDataMigration;
- private DataMigrationManager dataMigrationManager;
- private MigrationInfoSerialization migrationInfoSerialization;
- private MvccEntitySerializationStrategy v1Strategy;
- private MvccEntitySerializationStrategy v2Strategy;
- private EntityManagerFactory emf;
-
-
-
- /**
- * Rule to do the resets we need
- */
- @Rule
- public MigrationTestRule migrationTestRule =
-
- new MigrationTestRule( app, SpringResource.getInstance().getBean( Injector.class ) ,MvccEntityDataMigrationImpl.class );
- private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
- private ApplicationObservable applicationObservable;
-
-
- @Before
- public void setup() {
- emf = setup.getEmf();
- injector = SpringResource.getInstance().getBean(Injector.class);
- entityDataMigration = injector.getInstance( MvccEntityDataMigrationImpl.class );
- injector = SpringResource.getInstance().getBean( Injector.class );
- dataMigrationManager = injector.getInstance( DataMigrationManager.class );
- migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
- MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
- allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
- applicationObservable = injector.getInstance(ApplicationObservable.class);
-
- v1Strategy = strategy.getMigration().from();
- v2Strategy = strategy.getMigration().to();
- }
-
-
- @Test
- @Ignore("Awaiting fix for USERGRID-268")
- public void testDataMigration() throws Throwable {
-
- assertEquals( "version 3 expected", 3, entityDataMigration.getVersion() );
- assertEquals( "Previous version expected", 2, dataMigrationManager.getCurrentVersion());
-
-
- final EntityManager newAppEm = app.getEntityManager();
-
- final String type1 = "type1thing";
- final String type2 = "type2thing";
- final int size = 10;
-
- final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
- final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
-
-
- final Set<Id> createdEntityIds = new HashSet<>();
- createdEntityIds.addAll( type1Identities );
- createdEntityIds.addAll( type2Identities );
-
-
- final TestProgressObserver progressObserver = new TestProgressObserver();
-
-
- //load everything that appears in v1, migrate and ensure it appears in v2
- final Set<MvccEntity> savedEntities = new HashSet<>( 10000 );
- //set that holds all entityIds for later assertion
- final Set<Id> entityIds = new HashSet<>(10000);
-
-
- //read everything in previous version format and put it into our types. Assumes we're
- //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
-
- allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
- .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public void call(
- final ApplicationEntityGroup<CollectionScope> entity ) {
-
- //add all versions from history to our comparison
- for ( final EntityIdScope id : entity.entityIds ) {
-
- CollectionScope scope = CpNamingUtils
- .getCollectionScopeNameFromEntityType(
- entity.applicationScope.getApplication(),
- id.getId().getType());
-
- final Iterator<MvccEntity> versions = v1Strategy
- .loadDescendingHistory( scope, id.getId(), UUIDGenerator.newTimeUUID(),
- 100 );
-
- while ( versions.hasNext() ) {
-
- final MvccEntity mvccEntity = versions.next();
-
- savedEntities.add( mvccEntity );
-
- createdEntityIds.remove( mvccEntity.getId() );
-
- entityIds.add( id.getId() );
- }
- }
- }
- } ).toBlocking().lastOrDefault( null );
-
- assertEquals( "Newly saved entities encountered", 0, createdEntityIds.size() );
- assertTrue( "Saved new entities", savedEntities.size() > 0 );
-
- //perform the migration
- rx.Observable<ApplicationEntityGroup> oRx = allEntitiesInSystemObservable.getAllEntitiesInSystem(applicationObservable.getAllApplicationScopes(), 1000);
-
- entityDataMigration.migrate(oRx, progressObserver).toBlocking().last();
-
- assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
- assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
- //write the status and version, then invalidate the cache so it appears
- migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
- migrationInfoSerialization.setVersion( entityDataMigration.getVersion() );
- dataMigrationManager.invalidate();
-
- assertEquals( "New version saved, and we should get new implementation", entityDataMigration.getVersion(),
- dataMigrationManager.getCurrentVersion() );
-
-
- //now visit all entities in the system again, load them from v2, and ensure they're the same
- allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
- .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public void call(
- final ApplicationEntityGroup<CollectionScope> entity ) {
- //add all versions from history to our comparison
- for ( final EntityIdScope<CollectionScope> id : entity.entityIds ) {
-
- CollectionScope scope = CpNamingUtils
- .getCollectionScopeNameFromEntityType(
- entity.applicationScope.getApplication(),
- id.getId().getType());
-
- final Iterator<MvccEntity> versions = v2Strategy
- .loadDescendingHistory( scope, id.getId(),
- UUIDGenerator.newTimeUUID(), 100 );
-
- while ( versions.hasNext() ) {
-
- final MvccEntity mvccEntity = versions.next();
-
- savedEntities.remove( mvccEntity );
- }
- }
- }
- }
-
-
- ).toBlocking().lastOrDefault( null );
-
-
- assertEquals( "All entities migrated", 0, savedEntities.size() );
-
-
- //now visit all entities in the system again, and load them from the EM,
- // ensure we see everything we did in the v1 traversal
- allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
- .doOnNext( new Action1<ApplicationEntityGroup<CollectionScope>>() {
- @Override
- public void call(
- final ApplicationEntityGroup<CollectionScope> entity ) {
-
- final EntityManager em = emf.getEntityManager(
- entity.applicationScope.getApplication().getUuid() );
-
- //add all versions from history to our comparison
- for ( final EntityIdScope<CollectionScope> id : entity.entityIds ) {
-
-
- try {
- final Entity emEntity = em.get( SimpleEntityRef.fromId( id.getId() ) );
-
- if(emEntity != null){
- entityIds.remove( id.getId() );
- }
- }
- catch ( Exception e ) {
- throw new RuntimeException("Error loading entity", e);
- }
- }
- }
- }
-
-
- ).toBlocking().lastOrDefault( null );
-
-
- assertEquals("All entities could be loaded by the entity manager", 0, entityIds.size());
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/802dcdec/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index 421b875..99fb139 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -36,7 +36,6 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;