You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/11/20 23:48:23 UTC
[1/3] incubator-usergrid git commit: Refactored generation of
collection scope to be re-used with CpNamingUtils for consistency
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-250-buffer-size-fix 68f4e0f1a -> e32d5212a
Refactored generation of collection scope to be re-used with CpNamingUtils for consistency
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9ed79642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9ed79642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9ed79642
Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: 9ed796423f15a8ef33137b113f6e04c578994b21
Parents: 68f4e0f
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 14:15:24 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 14:23:38 2014 -0700
----------------------------------------------------------------------
.../migration/EntityDataMigration.java | 139 +++++++++++++++++
.../migration/EntityTypeMappingMigration.java | 28 ++--
.../migration/GraphShardVersionMigration.java | 104 +++++++------
.../rx/AllEntitiesInSystemObservable.java | 27 ++--
.../corepersistence/StaleIndexCleanupTest.java | 5 +-
.../migration/EntityTypeMappingMigrationIT.java | 74 +++++-----
.../migration/GraphShardVersionMigrationIT.java | 148 +++++++++++--------
.../rx/AllEntitiesInSystemObservableIT.java | 23 ++-
8 files changed, 364 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
new file mode 100644
index 0000000..79d31a8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.functions.Action1;
+
+
+/**
+ * Migration for migrating graph edges to the new Shards
+ */
+public class EntityDataMigration implements DataMigration {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
+
+
+ private final MvccEntitySerializationStrategy v1Serialization;
+ private final MvccEntitySerializationStrategy v2Serialization;
+
+ private final ManagerCache managerCache;
+ private final Keyspace keyspace;
+
+
+ @Inject
+ public EntityDataMigration( @PreviousImpl final MvccEntitySerializationStrategy v1Serialization,
+ @CurrentImpl final MvccEntitySerializationStrategy v2Serialization,
+ final ManagerCache managerCache, final Keyspace keyspace ) {
+ this.v1Serialization = v1Serialization;
+ this.v2Serialization = v2Serialization;
+ this.managerCache = managerCache;
+ this.keyspace = keyspace;
+ }
+
+
+ @Override
+ public void migrate( final ProgressObserver observer ) throws Throwable {
+
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
+ new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+
+
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+
+
+ final UUID now = UUIDGenerator.newTimeUUID();
+
+ final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
+
+
+ final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+
+ for ( Id entityId : applicationEntityGroup.entityIds ) {
+
+ CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
+ appScopeId, entityId.getType() );
+
+
+ Iterator<MvccEntity> allVersions =
+ v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+
+ while ( allVersions.hasNext() ) {
+ final MvccEntity version = allVersions.next();
+
+ final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+
+ totalBatch.mergeShallow( versionBatch );
+
+ if ( totalBatch.getRowCount() >= 50 ) {
+ try {
+ totalBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new DataMigrationException( "Unable to migrate batches ", e );
+ }
+ }
+ }
+ }
+
+ try {
+ totalBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new DataMigrationException( "Unable to migrate batches ", e );
+ }
+ }
+ } ).toBlocking().last();
+ }
+
+
+ @Override
+ public int getVersion() {
+ return Versions.VERSION_3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 1adfe73..8089dfd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -29,13 +29,10 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
@@ -47,9 +44,6 @@ import rx.functions.Action1;
*/
public class EntityTypeMappingMigration implements DataMigration {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityTypeMappingMigration.class );
-
private final ManagerCache managerCache;
@@ -65,25 +59,27 @@ public class EntityTypeMappingMigration implements DataMigration {
final AtomicLong atomicLong = new AtomicLong();
- AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.EntityData entityData ) {
+ public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
- final MapScope ms = CpNamingUtils.getEntityTypeMapScope( entityData.applicationScope.getApplication() );
+ final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() );
final MapManager mapManager = managerCache.getMapManager( ms );
- final UUID entityUuid = entityData.entityId.getUuid();
- final String entityType = entityData.entityId.getType();
+ for(Id entityId: applicationEntityGroup.entityIds) {
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
- mapManager.putString( entityUuid.toString(), entityType );
+ mapManager.putString( entityUuid.toString(), entityType );
- if ( atomicLong.incrementAndGet() % 100 == 0 ) {
- updateStatus( atomicLong, observer );
+ if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+ updateStatus( atomicLong, observer );
+ }
}
}
} ).toBlocking().lastOrDefault( null );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index ac4cd58..3b92570 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
@@ -64,8 +65,7 @@ public class GraphShardVersionMigration implements DataMigration {
@Inject
public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization,
- final ManagerCache managerCache, final
- Keyspace keyspace ) {
+ final ManagerCache managerCache, final Keyspace keyspace ) {
this.v2Serialization = v2Serialization;
this.managerCache = managerCache;
this.keyspace = keyspace;
@@ -77,51 +77,71 @@ public class GraphShardVersionMigration implements DataMigration {
final AtomicLong counter = new AtomicLong();
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache).flatMap(
- new Func1<AllEntitiesInSystemObservable.EntityData, Observable<List<Edge>>>() {
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).flatMap(
+ new Func1<AllEntitiesInSystemObservable.ApplicationEntityGroup, Observable<List<Edge>>>() {
@Override
- public Observable<List<Edge>> call( final AllEntitiesInSystemObservable.EntityData entityData ) {
- logger.info( "Migrating edges from node {} in scope {}", entityData.entityId,
- entityData.applicationScope );
-
- final GraphManager gm = managerCache.getGraphManager( entityData.applicationScope );
-
- //get each edge from this node as a source
- return EdgesFromSourceObservable.edgesFromSource( gm, entityData.entityId )
-
- //for each edge, re-index it in v2 every 1000 edges or less
- .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
- @Override
- public void call( final List<Edge> edges ) {
-
- final MutationBatch batch = keyspace.prepareMutationBatch();
-
- for ( final Edge edge : edges ) {
- logger.info( "Migrating meta for edge {}", edge );
- final MutationBatch edgeBatch =
- v2Serialization.writeEdge( entityData.applicationScope, edge );
- batch.mergeShallow( edgeBatch );
- }
-
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to perform migration", e );
- }
-
- //update the observer so the admin can see it
- final long newCount = counter.addAndGet( edges.size() );
-
- observer.update( getVersion(), String.format("Currently running. Rewritten %d edge types", newCount) );
-
-
- }
- } );
+ public Observable<List<Edge>> call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+
+ //emit a stream of all ids from this group
+ return Observable.from( applicationEntityGroup.entityIds )
+ .flatMap( new Func1<Id, Observable<List<Edge>>>() {
+
+
+ //for each id in the group, get it's edges
+ @Override
+ public Observable<List<Edge>> call( final Id id ) {
+ logger.info( "Migrating edges from node {} in scope {}", id,
+ applicationEntityGroup.applicationScope );
+
+ final GraphManager gm = managerCache
+ .getGraphManager( applicationEntityGroup.applicationScope );
+
+ //get each edge from this node as a source
+ return EdgesFromSourceObservable.edgesFromSource( gm, id )
+
+ //for each edge, re-index it in v2 every 1000 edges or less
+ .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
+ @Override
+ public void call( final List<Edge> edges ) {
+
+ final MutationBatch batch =
+ keyspace.prepareMutationBatch();
+
+ for ( final Edge edge : edges ) {
+ logger.info( "Migrating meta for edge {}", edge );
+ final MutationBatch edgeBatch = v2Serialization
+ .writeEdge(
+ applicationEntityGroup
+ .applicationScope,
+ edge );
+ batch.mergeShallow( edgeBatch );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException(
+ "Unable to perform migration", e );
+ }
+
+ //update the observer so the admin can see it
+ final long newCount =
+ counter.addAndGet( edges.size() );
+
+ observer.update( getVersion(), String.format(
+ "Currently running. Rewritten %d edge types",
+ newCount ) );
+ }
+ } );
+ }
+ } );
}
} ).toBlocking().lastOrDefault( null );
+ ;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
index 291bbe9..771b81f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.rx;
+import java.util.List;
+
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -40,14 +42,17 @@ public class AllEntitiesInSystemObservable {
/**
* Return an observable that emits all entities in the system.
+ * @param managerCache the managerCache to use
+ * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size
+ * you may be more than 1 ApplicationEntityGroup with the same application and different ids
*/
- public static Observable<EntityData> getAllEntitiesInSystem( final ManagerCache managerCache) {
+ public static Observable<ApplicationEntityGroup> getAllEntitiesInSystem( final ManagerCache managerCache, final int bufferSize) {
//traverse all nodes in the graph, load all source edges from them, then re-save the meta data
return ApplicationObservable.getAllApplicationIds( managerCache )
- .flatMap( new Func1<Id, Observable<EntityData>>() {
+ .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
@Override
- public Observable<EntityData> call( final Id applicationId ) {
+ public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
//set up our application scope and graph manager
final ApplicationScope applicationScope = new ApplicationScopeImpl(
@@ -68,11 +73,11 @@ public class AllEntitiesInSystemObservable {
//merge both the specified application node and the entity node
// so they all get used
- return Observable.merge( applicationNode, entityNodes )
- .map( new Func1<Id, EntityData>() {
+ return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
+ .map( new Func1<List<Id>, ApplicationEntityGroup>() {
@Override
- public EntityData call( final Id id ) {
- return new EntityData( applicationScope, id );
+ public ApplicationEntityGroup call( final List<Id> id ) {
+ return new ApplicationEntityGroup( applicationScope, id );
}
} );
}
@@ -83,14 +88,14 @@ public class AllEntitiesInSystemObservable {
/**
* Get the entity data. Immutable bean for fast access
*/
- public static final class EntityData {
+ public static final class ApplicationEntityGroup {
public final ApplicationScope applicationScope;
- public final Id entityId;
+ public final List<Id> entityIds;
- public EntityData( final ApplicationScope applicationScope, final Id entityId ) {
+ public ApplicationEntityGroup( final ApplicationScope applicationScope, final List<Id> entityIds ) {
this.applicationScope = applicationScope;
- this.entityId = entityId;
+ this.entityIds = entityIds;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index fa9f9df..9d0c9e6 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.fasterxml.uuid.UUIDComparator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -220,9 +221,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
EntityManager em = app.getEntityManager();
- CollectionScope cs = new CollectionScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
- new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
- CpNamingUtils.getCollectionScopeNameFromEntityType( eref.getType() ) );
+ CollectionScope cs = getCollectionScopeNameFromEntityType( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ), eref.getType() );
EntityCollectionManagerFactory ecmf = CpSetup.getInjector().getInstance( EntityCollectionManagerFactory.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index dafdb00..1f0665a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -46,7 +45,6 @@ import rx.functions.Action1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -75,7 +73,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
@Test
public void testIdMapping() throws Throwable {
- assertEquals("version 1 expected", 1, entityTypeMappingMigration.getVersion());
+ assertEquals( "version 1 expected", 1, entityTypeMappingMigration.getVersion() );
final EntityManager newAppEm = app.getEntityManager();
@@ -87,7 +85,6 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
-
final Set<Id> allEntities = new HashSet<>();
allEntities.addAll( type1Identities );
allEntities.addAll( type2Identities );
@@ -106,38 +103,45 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
entityTypeMappingMigration.migrate( progressObserver );
-
-
-
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
//ensure that each one has a type
- try {
-
- final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
- final Entity returned = em.get( entity.entityId.getUuid() );
-
- //we seem to occasionally get phantom edges. If this is the case we'll store the type _> uuid mapping, but we won't have anything to load
- if(returned != null) {
- assertEquals( entity.entityId.getUuid(), returned.getUuid() );
- assertEquals( entity.entityId.getType(), returned.getType() );
- }
- else {
- final String type = managerCache.getMapManager( CpNamingUtils.getEntityTypeMapScope(
- entity.applicationScope.getApplication() ) )
- .getString( entity.entityId.getUuid().toString() );
-
- assertEquals(entity.entityId.getType(), type);
- }
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Unable to get entity " + entity.entityId
- + " by UUID, migration failed", e );
- }
- allEntities.remove( entity.entityId );
+ final EntityManager em = emf.getEntityManager(
+ entity.applicationScope.getApplication().getUuid() );
+
+ for ( final Id id : entity.entityIds ) {
+ try {
+ final Entity returned = em.get( id.getUuid() );
+
+ //we seem to occasionally get phantom edges. If this is the
+ // case we'll store the type _> uuid mapping, but we won't have
+ // anything to load
+
+ if ( returned != null ) {
+ assertEquals( id.getUuid(), returned.getUuid() );
+ assertEquals( id.getType(), returned.getType() );
+ }
+ else {
+ final String type = managerCache.getMapManager( CpNamingUtils
+ .getEntityTypeMapScope(
+ entity.applicationScope.getApplication() ) )
+ .getString( id.getUuid()
+ .toString() );
+
+ assertEquals( id.getType(), type );
+ }
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to get entity " + id
+ + " by UUID, migration failed", e );
+ }
+
+ allEntities.remove( id );
+ }
}
} ).toBlocking().lastOrDefault( null );
@@ -145,9 +149,5 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
assertEquals( "Every element should have been encountered", 0, allEntities.size() );
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
-
-
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index aab47a0..88c02cd 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -32,12 +32,9 @@ import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
-import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
-import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -114,40 +111,41 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+ public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
- /**
- * Get our edge types from the source
- */
- gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( entity.entityId, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- sourceTypes.put( entity.entityId, s );
- }
- } ).toBlocking().lastOrDefault( null );
-
-
- /**
- * Get the edge types to the target
- */
- gm.getEdgeTypesToTarget(
- new SimpleSearchEdgeType( entity.entityId, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- targetTypes.put( entity.entityId, s );
- }
- } ).toBlocking().lastOrDefault( null );
-
- allEntities.remove( entity.entityId );
+ for(final Id id: entity.entityIds) {
+ /**
+ * Get our edge types from the source
+ */
+ gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ sourceTypes.put( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+
+ /**
+ * Get the edge types to the target
+ */
+ gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
+ null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ targetTypes.put( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+ allEntities.remove( id );
+ }
}
} ).toBlocking().lastOrDefault( null );
@@ -169,42 +167,66 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
- .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
- /**
- * Get our edge types from the source
- */
- gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( entity.entityId, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- sourceTypes.remove( entity.entityId, s );
- }
- } ).toBlocking().lastOrDefault( null );
-
-
- /**
- * Get the edge types to the target
- */
- gm.getEdgeTypesToTarget(
- new SimpleSearchEdgeType( entity.entityId, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- targetTypes.remove( entity.entityId, s );
- }
- } ).toBlocking().lastOrDefault( null );
+ for ( final Id id : entity.entityIds ) {
+ /**
+ * Get our edge types from the source
+ */
+ gm.getEdgeTypesFromSource(
+ new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ sourceTypes.remove( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+
+ /**
+ * Get the edge types to the target
+ */
+ gm.getEdgeTypesToTarget(
+ new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ targetTypes.remove( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+ }
+ }
}
- } ).toBlocking().lastOrDefault( null );
- assertEquals( "All source types migrated", 0, sourceTypes.size() );
- assertEquals( "All target types migrated", 0, targetTypes.size() );
+
+ ).
+
+
+ toBlocking()
+
+
+ .
+
+
+ lastOrDefault( null );
+
+
+ assertEquals( "All source types migrated",0,sourceTypes.size( )
+
+
+ );
+
+
+ assertEquals( "All target types migrated",0,targetTypes.size( )
+
+
+ );
+ }
}
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9ed79642/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index 423dc1f..4d1c6c9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.corepersistence.rx;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
@@ -33,13 +32,11 @@ import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.SimpleEntityRef;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
import rx.functions.Action1;
@@ -95,26 +92,28 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+ public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
assertNotNull(entity);
assertNotNull(entity.applicationScope);
- assertNotNull(entity.entityId);
+ assertNotNull(entity.entityIds);
//not from our test, don't check it
if(!applicationId.equals( entity.applicationScope.getApplication() )){
return;
}
+ for(Id id: entity.entityIds) {
- //we should only emit each node once
- if ( entity.entityId.getType().equals( type1 ) ) {
- assertTrue( "Element should be present on removal", type1Identities.remove( entity.entityId ) );
- }
- else if ( entity.entityId.getType().equals( type2 ) ) {
- assertTrue( "Element should be present on removal", type2Identities.remove( entity.entityId ) );
+ //we should only emit each node once
+ if ( id.getType().equals( type1 ) ) {
+ assertTrue( "Element should be present on removal", type1Identities.remove( id ) );
+ }
+ else if ( id.getType().equals( type2 ) ) {
+ assertTrue( "Element should be present on removal", type2Identities.remove( id ) );
+ }
}
}
} ).toBlocking().lastOrDefault( null );
[3/3] incubator-usergrid git commit: Finished migration testing. All
core tests pass.
Posted by to...@apache.org.
Finished migration testing. All core tests pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e32d5212
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e32d5212
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e32d5212
Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: e32d5212af3c53cc84c9a505cb9bb6824e6a16f2
Parents: 364a6ea
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 15:48:19 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 15:48:19 2014 -0700
----------------------------------------------------------------------
.../migration/EntityDataMigration.java | 90 ++++---
.../rx/ApplicationObservable.java | 65 +++--
.../migration/EntityDataMigrationIT.java | 251 +++++++++++++++++++
.../migration/GraphShardVersionMigrationIT.java | 131 +++++-----
4 files changed, 394 insertions(+), 143 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
index 79d31a8..767574d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.Iterator;
import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
@@ -53,9 +51,6 @@ import rx.functions.Action1;
public class EntityDataMigration implements DataMigration {
- private static final Logger logger = LoggerFactory.getLogger( EntityDataMigration.class );
-
-
private final MvccEntitySerializationStrategy v1Serialization;
private final MvccEntitySerializationStrategy v2Serialization;
@@ -77,58 +72,71 @@ public class EntityDataMigration implements DataMigration {
@Override
public void migrate( final ProgressObserver observer ) throws Throwable {
+ final AtomicLong atomicLong = new AtomicLong();
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext(
- new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup
+ applicationEntityGroup ) {
- @Override
- public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup applicationEntityGroup ) {
+ final UUID now = UUIDGenerator.newTimeUUID();
- final UUID now = UUIDGenerator.newTimeUUID();
+ final Id appScopeId =
+ applicationEntityGroup.applicationScope.getApplication();
- final Id appScopeId = applicationEntityGroup.applicationScope.getApplication();
+ final MutationBatch totalBatch = keyspace.prepareMutationBatch();
- final MutationBatch totalBatch = keyspace.prepareMutationBatch();
+ //go through each entity in the system, and load it's entire
+ // history
+ for ( Id entityId : applicationEntityGroup.entityIds ) {
- for ( Id entityId : applicationEntityGroup.entityIds ) {
+ CollectionScope currentScope = CpNamingUtils
+ .getCollectionScopeNameFromEntityType( appScopeId,
+ entityId.getType() );
- CollectionScope currentScope = CpNamingUtils.getCollectionScopeNameFromEntityType(
- appScopeId, entityId.getType() );
+ //for each element in the history in the previous version,
+ // copy it to the CF in v2
+ Iterator<MvccEntity> allVersions = v1Serialization
+ .loadDescendingHistory( currentScope, entityId, now,
+ 1000 );
- Iterator<MvccEntity> allVersions =
- v1Serialization.loadDescendingHistory( currentScope, entityId, now, 1000 );
+ while ( allVersions.hasNext() ) {
+ final MvccEntity version = allVersions.next();
- while ( allVersions.hasNext() ) {
- final MvccEntity version = allVersions.next();
+ final MutationBatch versionBatch =
+ v2Serialization.write( currentScope, version );
- final MutationBatch versionBatch = v2Serialization.write( currentScope, version );
+ totalBatch.mergeShallow( versionBatch );
+
+ if ( atomicLong.incrementAndGet() % 50 == 0 ) {
+ executeBatch( totalBatch, observer, atomicLong );
+ }
+ }
+ }
+
+ executeBatch( totalBatch, observer, atomicLong );
+ }
+ } ).toBlocking().last();
+ }
- totalBatch.mergeShallow( versionBatch );
- if ( totalBatch.getRowCount() >= 50 ) {
- try {
- totalBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new DataMigrationException( "Unable to migrate batches ", e );
- }
- }
- }
- }
+ private void executeBatch( final MutationBatch batch, final ProgressObserver po, final AtomicLong count ) {
+ try {
+ batch.execute();
- try {
- totalBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new DataMigrationException( "Unable to migrate batches ", e );
- }
- }
- } ).toBlocking().last();
+ po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
+ }
+ catch ( ConnectionException e ) {
+ po.failed( getVersion(), "Failed to execute mutation in cassandra" );
+ throw new DataMigrationException( "Unable to migrate batches ", e );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 898812b..6019bca 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -23,18 +23,20 @@ package org.apache.usergrid.corepersistence.rx;
import java.util.Arrays;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -47,14 +49,12 @@ import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicat
/**
* An observable that will emit all application stored in the system.
*/
-public class ApplicationObservable {
-
+public class ApplicationObservable {
+ private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
/**
* Get all applicationIds as an observable
- * @param managerCache
- * @return
*/
public static Observable<Id> getAllApplicationIds( final ManagerCache managerCache ) {
@@ -62,22 +62,20 @@ public class ApplicationObservable {
//this way consumers can perform whatever work they need to on the root system first
- final Observable<Id> systemIds = Observable.from( Arrays.asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
+ final Observable<Id> systemIds = Observable.from( Arrays
+ .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+ generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+ generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
- final CollectionScope appInfoCollectionScope = new CollectionScopeImpl(
- appScope.getApplication(),
- appScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ));
+ final CollectionScope appInfoCollectionScope =
+ new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
- final EntityCollectionManager
- collectionManager = managerCache.getEntityCollectionManager( appInfoCollectionScope );
+ final EntityCollectionManager collectionManager =
+ managerCache.getEntityCollectionManager( appInfoCollectionScope );
final GraphManager gm = managerCache.getGraphManager( appScope );
@@ -97,21 +95,34 @@ public class ApplicationObservable {
//get the app info and load it
final Id appInfo = edge.getTargetNode();
- return collectionManager.load( appInfo ).map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
+ return collectionManager.load( appInfo )
+ //filter out null entities
+ .filter( new Func1<Entity, Boolean>() {
+ @Override
+ public Boolean call( final Entity entity ) {
+ if ( entity == null ) {
+ logger.warn( "Encountered a null application info for id {}", appInfo );
+ return false;
+ }
+ return true;
+ }
+ } )
+ //get the id from the entity
+ .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
- @Override
- public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
- final UUID uuid = (UUID )entity.getField( "applicationUuid" ).getValue();
- return CpNamingUtils.generateApplicationId(uuid);
- }
- } );
+ @Override
+ public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
+
+ final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+ return CpNamingUtils.generateApplicationId( uuid );
+ }
+ } );
}
} );
- return Observable.merge( systemIds, appIds);
+ return Observable.merge( systemIds, appIds );
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
new file mode 100644
index 0000000..5c9e14c
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.CurrentImpl;
+import org.apache.usergrid.persistence.core.guice.PreviousImpl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Injector;
+import com.google.inject.Key;
+
+import rx.functions.Action1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class EntityDataMigrationIT extends AbstractCoreIT {
+
+
+ private Injector injector;
+
+
+ private EntityDataMigration entityDataMigration;
+ private ManagerCache managerCache;
+ private DataMigrationManager dataMigrationManager;
+ private MigrationInfoSerialization migrationInfoSerialization;
+ private MvccEntitySerializationStrategy v1Strategy;
+ private MvccEntitySerializationStrategy v2Strategy;
+ private EntityManagerFactory emf;
+
+
+ @Before
+ public void setup() {
+ emf = setup.getEmf();
+ injector = CpSetup.getInjector();
+ entityDataMigration = injector.getInstance( EntityDataMigration.class );
+ managerCache = injector.getInstance( ManagerCache.class );
+ dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+ migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
+ v1Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, PreviousImpl.class) );
+ v2Strategy = injector.getInstance( Key.get(MvccEntitySerializationStrategy.class, CurrentImpl.class) );
+ }
+
+
+ @Test
+ public void testDataMigration() throws Throwable {
+
+ assertEquals( "version 3 expected", 3, entityDataMigration.getVersion() );
+
+
+ /**
+ * Reset to our version -1 and start the migration
+ */
+ dataMigrationManager.resetToVersion( entityDataMigration.getVersion() - 1 );
+
+
+ final EntityManager newAppEm = app.getEntityManager();
+
+ final String type1 = "type1thing";
+ final String type2 = "type2thing";
+ final int size = 10;
+
+ final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
+ final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
+
+
+ final Set<Id> createdEntityIds = new HashSet<>();
+ createdEntityIds.addAll( type1Identities );
+ createdEntityIds.addAll( type2Identities );
+
+
+ final TestProgressObserver progressObserver = new TestProgressObserver();
+
+
+ //load everything that appears in v1, migrate and ensure it appears in v2
+ final Set<MvccEntity> savedEntities = new HashSet<>( 10000 );
+ //set that holds all entityIds for later assertion
+ final Set<Id> entityIds = new HashSet<>(10000);
+
+
+ //read everything in previous version format and put it into our types. Assumes we're
+ //using a test system, and it's not a huge amount of data, otherwise we'll overflow.
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+
+ //add all versions from history to our comparison
+ for ( final Id id : entity.entityIds ) {
+
+ CollectionScope scope = CpNamingUtils
+ .getCollectionScopeNameFromEntityType(
+ entity.applicationScope.getApplication(),
+ id.getType() );
+
+ final Iterator<MvccEntity> versions = v1Strategy
+ .loadDescendingHistory( scope, id, UUIDGenerator.newTimeUUID(),
+ 100 );
+
+ while ( versions.hasNext() ) {
+
+ final MvccEntity mvccEntity = versions.next();
+
+ savedEntities.add( mvccEntity );
+
+ createdEntityIds.remove( mvccEntity.getId() );
+
+ entityIds.add( id );
+ }
+ }
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+ assertEquals( "Newly saved entities encountered", 0, createdEntityIds.size() );
+ assertTrue( "Saved new entities", savedEntities.size() > 0 );
+
+ //perform the migration
+ entityDataMigration.migrate( progressObserver );
+
+ assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+ assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+ //write the status and version, then invalidate the cache so it appears
+ migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
+ migrationInfoSerialization.setVersion( entityDataMigration.getVersion() );
+ dataMigrationManager.invalidate();
+
+ assertEquals( "New version saved, and we should get new implementation", entityDataMigration.getVersion(),
+ dataMigrationManager.getCurrentVersion() );
+
+
+ //now visit all entities in the system again, load them from v2, and ensure they're the same
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable
+ .ApplicationEntityGroup entity ) {
+ //add all versions from history to our comparison
+ for ( final Id id : entity.entityIds ) {
+
+ CollectionScope scope = CpNamingUtils
+ .getCollectionScopeNameFromEntityType(
+ entity.applicationScope.getApplication(),
+ id.getType() );
+
+ final Iterator<MvccEntity> versions = v2Strategy
+ .loadDescendingHistory( scope, id,
+ UUIDGenerator.newTimeUUID(), 100 );
+
+ while ( versions.hasNext() ) {
+
+ final MvccEntity mvccEntity = versions.next();
+
+ savedEntities.remove( mvccEntity );
+ }
+ }
+ }
+ }
+
+
+ ).toBlocking().lastOrDefault( null );
+
+
+ assertEquals( "All entities migrated", 0, savedEntities.size() );
+
+
+ //now visit all entities in the system again, and load them from the EM, ensure we see everything we did in the v1 traversal
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable
+ .ApplicationEntityGroup entity ) {
+
+ final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
+
+ //add all versions from history to our comparison
+ for ( final Id id : entity.entityIds ) {
+
+
+ try {
+ final Entity emEntity = em.get( SimpleEntityRef.fromId( id ) );
+
+ if(emEntity != null){
+ entityIds.remove( id );
+ }
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException("Error loading entity", e);
+ }
+ }
+ }
+ }
+
+
+ ).toBlocking().lastOrDefault( null );
+
+
+ assertEquals("All entities could be loaded by the entity manager", 0, entityIds.size());
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e32d5212/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 88c02cd..51ea052 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.Injector;
-import com.netflix.astyanax.Keyspace;
import rx.functions.Action1;
@@ -55,7 +54,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
private Injector injector;
private GraphShardVersionMigration graphShardVersionMigration;
- private Keyspace keyspace;
private ManagerCache managerCache;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
@@ -65,7 +63,6 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
public void setup() {
injector = CpSetup.getInjector();
graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
- keyspace = injector.getInstance( Keyspace.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
@@ -75,15 +72,12 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
@Test
public void testIdMapping() throws Throwable {
- assertEquals("version 2 expected", 2, graphShardVersionMigration.getVersion());
+ assertEquals( "version 2 expected", 2, graphShardVersionMigration.getVersion() );
/**
* Reset to our version -1 and start the migration
*/
- dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion()-1 );
-
-
-
+ dataMigrationManager.resetToVersion( graphShardVersionMigration.getVersion() - 1 );
final EntityManager newAppEm = app.getEntityManager();
@@ -111,15 +105,16 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000)
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
.doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
@Override
- public void call( final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
+ public void call(
+ final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
final GraphManager gm =
managerCache.getGraphManager( entity.applicationScope );
- for(final Id id: entity.entityIds) {
+ for ( final Id id : entity.entityIds ) {
/**
* Get our edge types from the source
*/
@@ -135,8 +130,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
/**
* Get the edge types to the target
*/
- gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id,
- null, null ) )
+ gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( id, null, null ) )
.doOnNext( new Action1<String>() {
@Override
public void call( final String s ) {
@@ -153,7 +147,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//perform the migration
graphShardVersionMigration.migrate( progressObserver );
- assertEquals("Newly saved entities encounterd", 0, allEntities.size());
+ assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -163,70 +157,57 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
dataMigrationManager.invalidate();
- assertEquals("New version saved, and we should get new implementation", graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion());
+ assertEquals( "New version saved, and we should get new implementation",
+ graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion() );
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
.doOnNext( new Action1<AllEntitiesInSystemObservable.ApplicationEntityGroup>() {
- @Override
- public void call(
- final AllEntitiesInSystemObservable.ApplicationEntityGroup entity ) {
-
- final GraphManager gm =
- managerCache.getGraphManager( entity.applicationScope );
-
- for ( final Id id : entity.entityIds ) {
- /**
- * Get our edge types from the source
- */
- gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( id, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- sourceTypes.remove( id, s );
- }
- } ).toBlocking().lastOrDefault( null );
-
-
- /**
- * Get the edge types to the target
- */
- gm.getEdgeTypesToTarget(
- new SimpleSearchEdgeType( id, null, null ) )
- .doOnNext( new Action1<String>() {
- @Override
- public void call( final String s ) {
- targetTypes.remove( id, s );
- }
- } ).toBlocking().lastOrDefault( null );
- }
- }
- }
-
-
- ).
-
-
- toBlocking()
-
-
- .
-
-
- lastOrDefault( null );
-
-
- assertEquals( "All source types migrated",0,sourceTypes.size( )
-
-
- );
-
-
- assertEquals( "All target types migrated",0,targetTypes.size( )
-
-
- );
- }
+ @Override
+ public void call(
+ final AllEntitiesInSystemObservable
+ .ApplicationEntityGroup entity ) {
+
+ final GraphManager gm =
+ managerCache.getGraphManager( entity.applicationScope );
+
+ for ( final Id id : entity.entityIds ) {
+ /**
+ * Get our edge types from the source
+ */
+ gm.getEdgeTypesFromSource(
+ new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ sourceTypes.remove( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+
+
+ /**
+ * Get the edge types to the target
+ */
+ gm.getEdgeTypesToTarget(
+ new SimpleSearchEdgeType( id, null, null ) )
+ .doOnNext( new Action1<String>() {
+ @Override
+ public void call( final String s ) {
+ targetTypes.remove( id, s );
+ }
+ } ).toBlocking().lastOrDefault( null );
+ }
+ }
+ }
+
+
+ ).toBlocking().lastOrDefault( null );
+
+
+ assertEquals( "All source types migrated", 0, sourceTypes.size() );
+
+
+ assertEquals( "All target types migrated", 0, targetTypes.size() );
}
+}
[2/3] incubator-usergrid git commit: Refactored generation of
collection scope to be re-usable with CpNamingUtils for consistency across
modules
Posted by to...@apache.org.
Refactored generation of collection scope to be re-usable with CpNamingUtils for consistency across modules
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/364a6ea6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/364a6ea6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/364a6ea6
Branch: refs/heads/USERGRID-250-buffer-size-fix
Commit: 364a6ea6737194a1dd266b6d0e89abb4a40ddf96
Parents: 9ed7964
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 20 14:24:03 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 20 14:24:03 2014 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 44 ++++++--------------
.../corepersistence/CpEntityManagerFactory.java | 2 +-
.../corepersistence/CpRelationManager.java | 35 ++++------------
.../corepersistence/util/CpNamingUtils.java | 19 +++++++++
4 files changed, 41 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index e2f67e8..e6b8bce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -131,6 +131,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery
import static me.prettyprint.hector.api.factory.HFactory.createMutator;
import static org.apache.commons.lang.StringUtils.capitalize;
import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -372,13 +373,9 @@ public class CpEntityManager implements EntityManager {
}
Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
- String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- collectionName );
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entityRef.getType());
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
// throw new IllegalArgumentException(
@@ -457,13 +454,10 @@ public class CpEntityManager implements EntityManager {
String type = Schema.getDefaultSchema().getEntityType( entityClass );
Id id = new SimpleId( entityId, type );
- String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( type );
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- collectionName );
- EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), type);
+
// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
// throw new IllegalArgumentException(
@@ -522,9 +516,8 @@ public class CpEntityManager implements EntityManager {
public void update( Entity entity ) throws Exception {
// first, update entity index in its own collection scope
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
+
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entity.getType());
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
@@ -590,9 +583,7 @@ public class CpEntityManager implements EntityManager {
private Observable deleteAsync( EntityRef entityRef ) throws Exception {
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+ CollectionScope collectionScope =getCollectionScopeNameFromEntityType(applicationScope.getApplication(), entityRef.getType() );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -639,7 +630,7 @@ public class CpEntityManager implements EntityManager {
// deindex from default index scope
IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+ getCollectionScopeNameFromEntityType( entityRef.getType() ) );
batch.deindex( defaultIndexScope, entity );
@@ -1010,15 +1001,10 @@ public class CpEntityManager implements EntityManager {
@Override
public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
-
- String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
-
- CollectionScope collectionScope =
- new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
- collectionName );
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(getApplicationScope().getApplication(), entityRef.getType());
IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+ getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );
@@ -2187,10 +2173,7 @@ public class CpEntityManager implements EntityManager {
private Id getIdForUniqueEntityField( final String collectionName, final String propertyName,
final Object propertyValue ) {
- CollectionScope collectionScope =
- new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( collectionName ) );
-
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), collectionName);
final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
@@ -2514,9 +2497,8 @@ public class CpEntityManager implements EntityManager {
org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityToCpEntity( entity, importId );
// prepare to write and index Core Persistence Entity into default scope
- CollectionScope collectionScope =
- new CollectionScopeImpl( applicationScope.getApplication(), applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( eType ) );
+ CollectionScope collectionScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), eType);
+
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
if ( logger.isDebugEnabled() ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e07bb00..82e80a5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -541,7 +541,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public long performEntityCount() {
//TODO, this really needs to be a task that writes this data somewhere since this will get
//progressively slower as the system expands
- return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache ).longCount().toBlocking().last();
+ return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).longCount().toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 8514504..dcb4ba1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -120,6 +120,7 @@ import rx.functions.Func1;
import static java.util.Arrays.asList;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -219,10 +220,7 @@ public class CpRelationManager implements RelationManager {
this.indexBucketLocator = indexBucketLocator; // TODO: this also
// load the Core Persistence version of the head entity as well
- this.headEntityScope = new CollectionScopeImpl(
- this.applicationScope.getApplication(),
- this.applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( headEntity.getType() ) );
+ this.headEntityScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), headEntity.getType());
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading head entity {}:{} from scope\n app {}\n owner {}\n name {}",
@@ -606,10 +604,7 @@ public class CpRelationManager implements RelationManager {
public Entity addToCollection( String collName, EntityRef itemRef, boolean connectBack )
throws Exception {
- CollectionScope memberScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
+ CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
org.apache.usergrid.persistence.model.entity.Entity memberEntity =
@@ -642,10 +637,7 @@ public class CpRelationManager implements RelationManager {
}
// load the new member entity to be added to the collection from its default scope
- CollectionScope memberScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
+ CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
//TODO, this double load should disappear once events are in
Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
@@ -797,11 +789,7 @@ public class CpRelationManager implements RelationManager {
}
// load the entity to be removed to the collection
- CollectionScope memberScope = new CollectionScopeImpl(
- this.applicationScope.getApplication(),
- this.applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( itemRef.getType() ) );
- EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager( memberScope );
+ CollectionScope memberScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), itemRef.getType());
if ( logger.isDebugEnabled() ) {
logger.debug( "Loading entity to remove from collection "
@@ -1017,10 +1005,7 @@ public class CpRelationManager implements RelationManager {
ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef );
- CollectionScope targetScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType() ) );
+ CollectionScope targetScope = getCollectionScopeNameFromEntityType(applicationScope.getApplication(), connectedEntityRef.getType());
if ( logger.isDebugEnabled() ) {
logger.debug("createConnection(): "
@@ -1249,12 +1234,8 @@ public class CpRelationManager implements RelationManager {
String connectionType = connectionRef.getConnectedEntity().getConnectionType();
- CollectionScope targetScope = new CollectionScopeImpl(
- applicationScope.getApplication(),
- applicationScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( connectedEntityRef.getType()) );
-
- EntityCollectionManager targetEcm = managerCache.getEntityCollectionManager( targetScope );
+ CollectionScope targetScope = getCollectionScopeNameFromEntityType( applicationScope.getApplication(),
+ connectedEntityRef.getType() );
if ( logger.isDebugEnabled() ) {
logger.debug( "Deleting connection '{}' from source {}:{} \n to target {}:{}",
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/364a6ea6/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 6fb35a3..684b6e0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -22,6 +22,8 @@ package org.apache.usergrid.corepersistence.util;
import java.util.UUID;
import org.apache.usergrid.persistence.Schema;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.entities.Application;
@@ -70,6 +72,23 @@ public class CpNamingUtils {
public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
+ /**
+ * Generate a collection scope for a collection within the application's Id for the given type
+ * @param applicationId The applicationId that owns this entity
+ * @param type The type in the collection
+ * @return The collectionScope
+ */
+ public static CollectionScope getCollectionScopeNameFromEntityType(final Id applicationId, final String type){
+ return
+ new CollectionScopeImpl( applicationId, applicationId,
+ getCollectionScopeNameFromEntityType( type ) );
+ }
+
+ /**
+ * Get the collection name from the entity/id type
+ * @param type
+ * @return
+ */
public static String getCollectionScopeNameFromEntityType( String type ) {
String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
return csn.toLowerCase();