You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/11/17 21:12:54 UTC

[44/50] [abbrv] incubator-usergrid git commit: Finished integration testing migrations. Also added pending tasks API call to ES

Finished integration testing migrations.  Also added pending tasks API call to ES


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3775eb2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3775eb2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3775eb2d

Branch: refs/heads/two-dot-o-events
Commit: 3775eb2d29ab6d33e6d733d945fd038be44680ba
Parents: 7deba17
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Nov 13 17:36:56 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Nov 13 17:36:56 2014 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   5 +-
 .../migration/EntityTypeMappingMigration.java   |   5 +-
 .../migration/GraphShardVersionMigration.java   |   2 +-
 .../corepersistence/rx/TargetIdObservable.java  |   2 +-
 .../corepersistence/util/CpNamingUtils.java     |  13 ++
 .../corepersistence/EntityWriteHelper.java      |  59 +++++
 .../migration/EntityTypeMappingMigrationIT.java | 156 ++++++++++++++
 .../migration/GraphShardVersionMigrationIT.java | 213 +++++++++++++++++++
 .../migration/TestProgressObserver.java         |  71 +++++++
 .../rx/AllEntitiesInSystemObservableIT.java     |  19 +-
 .../rx/EdgesFromSourceObservableIT.java         |  16 +-
 .../rx/EdgesToTargetObservableIT.java           |  18 +-
 .../rx/TargetIdObservableTestIT.java            |  18 +-
 .../migration/data/DataMigrationManager.java    |   5 +
 .../data/DataMigrationManagerImpl.java          |   6 +
 .../map/impl/MapSerializationImpl.java          |   4 +-
 .../usergrid/persistence/index/EntityIndex.java |   6 +
 .../index/impl/EsEntityIndexImpl.java           |  11 +
 .../impl/EntityConnectionIndexImplTest.java     |  11 +-
 .../persistence/index/impl/EsTestUtils.java     |  48 +++++
 20 files changed, 617 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2158bd1..2cb01d4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -91,7 +91,6 @@ import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.Field;
@@ -2227,7 +2226,9 @@ public class CpEntityManager implements EntityManager {
      */
     private MapManager getMapManagerForTypes() {
         Id mapOwner = new SimpleId( applicationId, TYPE_APPLICATION );
-        MapScope ms = new MapScopeImpl( mapOwner, CpNamingUtils.TYPES_BY_UUID_MAP );
+
+        final MapScope ms = CpNamingUtils.getEntityTypeMapScope( mapOwner );
+
         MapManager mm = managerCache.getMapManager( ms );
 
         return mm;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index f2712cc..1adfe73 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -72,11 +72,10 @@ public class EntityTypeMappingMigration implements DataMigration {
                                          @Override
                                          public void call( final AllEntitiesInSystemObservable.EntityData entityData ) {
 
-                                             final MapScope ms = new MapScopeImpl( entityData.entityId,
-                                                     CpNamingUtils.TYPES_BY_UUID_MAP );
+                                             final MapScope ms = CpNamingUtils.getEntityTypeMapScope( entityData.applicationScope.getApplication() );
 
 
-                                             final MapManager mapManager = managerCache.getMapManager(  ms );
+                                             final MapManager mapManager = managerCache.getMapManager( ms );
 
                                              final UUID entityUuid = entityData.entityId.getUuid();
                                              final String entityType = entityData.entityId.getType();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index ad45dab..ac4cd58 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -115,7 +115,7 @@ public class GraphShardVersionMigration implements DataMigration {
                                         //update the observer so the admin can see it
                                         final long newCount = counter.addAndGet( edges.size() );
 
-                                        observer.update( getVersion(), String.format("Finished re-writing %d edges", newCount) );
+                                        observer.update( getVersion(), String.format("Currently running.  Rewritten %d edge types", newCount) );
 
 
                                     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
index 3925aae..c4b6526 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
@@ -56,7 +56,7 @@ public class TargetIdObservable {
            public Id call( final Edge edge ) {
                final Id targetNode = edge.getTargetNode();
 
-               logger.info( "Emitting targetId of {}", edge );
+               logger.debug( "Emitting targetId of {}", edge );
 
 
                return targetNode;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 52449de..6fb35a3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
@@ -152,4 +154,15 @@ public class CpNamingUtils {
     public static Id generateApplicationId( UUID applicationId ) {
         return new SimpleId( applicationId, Application.ENTITY_TYPE );
     }
+
+
+    /**
+     * Get the map scope for the applicationId to store entity uuid to type mapping
+     *
+     * @param applicationId
+     * @return
+     */
+    public static MapScope getEntityTypeMapScope( final Id applicationId ){
+        return new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/EntityWriteHelper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/EntityWriteHelper.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/EntityWriteHelper.java
new file mode 100644
index 0000000..f497dd1
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/EntityWriteHelper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence;
+
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+
+public class EntityWriteHelper {
+
+
+    /**
+     * Write some generic entity data into a collection with the entity manager and type specified
+     * @param em
+     * @param type
+     * @param size
+     * @return
+     * @throws Exception
+     */
+    public static Set<Id> createTypes( final EntityManager em, final String type, final int size ) throws Exception {
+
+        final Set<Id> identities = new HashSet<>();
+
+        for ( int i = 0; i < size; i++ ) {
+            final Entity entity = em.create( type, new HashMap<String, Object>() {{
+                put( "property", "value" );
+            }} );
+            final Id createdId = new SimpleId( entity.getUuid(), entity.getType() );
+
+            identities.add( createdId );
+        }
+
+        return identities;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
new file mode 100644
index 0000000..d2c31ff
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Injector;
+import com.netflix.astyanax.Keyspace;
+
+import rx.functions.Action1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+
+public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
+
+
+    private Injector injector;
+
+
+    private EntityTypeMappingMigration entityTypeMappingMigration;
+    private Keyspace keyspace;
+    private MigrationManager migrationManager;
+    private EntityManagerFactory emf;
+    private ManagerCache managerCache;
+
+
+    @Before
+    public void setup() {
+        injector = CpSetup.getInjector();
+        emf = setup.getEmf();
+        entityTypeMappingMigration = injector.getInstance( EntityTypeMappingMigration.class );
+        keyspace = injector.getInstance( Keyspace.class );
+        migrationManager = injector.getInstance( MigrationManager.class );
+        managerCache = injector.getInstance( ManagerCache.class );
+    }
+
+
+    @Test
+    public void testIdMapping() throws Throwable {
+
+
+        final EntityManager newAppEm = app.getEntityManager();
+
+        final String type1 = "type1thing";
+        final String type2 = "type2thing";
+        final int size = 10;
+
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
+
+
+
+        final Set<Id> allEntities = new HashSet<>();
+        allEntities.addAll( type1Identities );
+        allEntities.addAll( type2Identities );
+
+
+        /**
+         * Drop our map keyspace to ensure we have no entries before migrating after doing our writes.
+         * This will ensure we have the data
+         */
+        keyspace.dropColumnFamily( MapSerializationImpl.MAP_ENTRIES );
+        keyspace.dropColumnFamily( MapSerializationImpl.MAP_KEYS );
+
+        //create the column families again
+        migrationManager.migrate();
+
+
+        final TestProgressObserver progressObserver = new TestProgressObserver();
+
+        entityTypeMappingMigration.migrate( progressObserver );
+
+
+
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+                                         @Override
+                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+                                             //ensure that each one has a type
+                                             try {
+
+                                                 final EntityManager em = emf.getEntityManager( entity.applicationScope.getApplication().getUuid() );
+                                                 final Entity returned = em.get( entity.entityId.getUuid() );
+
+                                                 //we seem to occasionally get phantom edges.  If this is the case we'll store the type _> uuid mapping, but we won't have anything to load
+                                                if(returned != null) {
+                                                    assertEquals( entity.entityId.getUuid(), returned.getUuid() );
+                                                    assertEquals( entity.entityId.getType(), returned.getType() );
+                                                }
+                                                else {
+                                                    final String type = managerCache.getMapManager( CpNamingUtils.getEntityTypeMapScope(
+                                                            entity.applicationScope.getApplication() ) )
+                                                            .getString( entity.entityId.getUuid().toString() );
+
+                                                    assertEquals(entity.entityId.getType(), type);
+                                                }
+                                             }
+                                             catch ( Exception e ) {
+                                                 throw new RuntimeException( "Unable to get entity " + entity.entityId
+                                                         + " by UUID, migration failed", e );
+                                             }
+
+                                             allEntities.remove( entity.entityId );
+                                         }
+                                     } ).toBlocking().lastOrDefault( null );
+
+
+        assertEquals( "Every element should have been encountered", 0, allEntities.size() );
+        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
new file mode 100644
index 0000000..e16c60d
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.AbstractCoreIT;
+import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization;
+import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerializationImpl;
+import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.inject.Injector;
+import com.netflix.astyanax.Keyspace;
+
+import rx.functions.Action1;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class GraphShardVersionMigrationIT extends AbstractCoreIT {
+
+    private Injector injector;
+    private GraphShardVersionMigration graphShardVersionMigration;
+    private Keyspace keyspace;
+    private MigrationManager migrationManager;
+    private EntityManagerFactory emf;
+    private ManagerCache managerCache;
+    private DataMigrationManager dataMigrationManager;
+    private MigrationInfoSerialization migrationInfoSerialization;
+
+
+    @Before
+    public void setup() {
+        injector = CpSetup.getInjector();
+        emf = setup.getEmf();
+        graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
+        keyspace = injector.getInstance( Keyspace.class );
+        migrationManager = injector.getInstance( MigrationManager.class );
+        managerCache = injector.getInstance( ManagerCache.class );
+        dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+        migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
+    }
+
+
+    @Test
+    public void testIdMapping() throws Throwable {
+
+
+        /**
+         * Drop our migration keyspaces to ensure we don't have a "new version in there"
+         * This will ensure we have an "old data" version of data written
+         */
+        keyspace.dropColumnFamily( MigrationInfoSerializationImpl.CF_MIGRATION_INFO );
+
+        //create the column families again
+        migrationManager.migrate();
+
+        final EntityManager newAppEm = app.getEntityManager();
+
+        final String type1 = "type1thing";
+        final String type2 = "type2thing";
+        final int size = 10;
+
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( newAppEm, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( newAppEm, type2, size );
+
+
+        final Set<Id> allEntities = new HashSet<>();
+        allEntities.addAll( type1Identities );
+        allEntities.addAll( type2Identities );
+
+
+        final TestProgressObserver progressObserver = new TestProgressObserver();
+
+
+        //used to validate 1.0 types, and 2.0 types
+        final Multimap<Id, String> sourceTypes = HashMultimap.create( 10000, 10 );
+        final Multimap<Id, String> targetTypes = HashMultimap.create( 10000, 10 );
+
+
+        //read everything in previous version format and put it into our types.
+
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+                                         @Override
+                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+
+                                             final GraphManager gm =
+                                                     managerCache.getGraphManager( entity.applicationScope );
+
+                                             /**
+                                              * Get our edge types from the source
+                                              */
+                                             gm.getEdgeTypesFromSource(
+                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
+                                               .doOnNext( new Action1<String>() {
+                                                   @Override
+                                                   public void call( final String s ) {
+                                                       sourceTypes.put( entity.entityId, s );
+                                                   }
+                                               } ).toBlocking().lastOrDefault( null );
+
+
+                                             /**
+                                              * Get the edge types to the target
+                                              */
+                                             gm.getEdgeTypesToTarget(
+                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
+                                               .doOnNext( new Action1<String>() {
+                                                   @Override
+                                                   public void call( final String s ) {
+                                                       targetTypes.put( entity.entityId, s );
+                                                   }
+                                               } ).toBlocking().lastOrDefault( null );
+
+                                             allEntities.remove( entity.entityId );
+                                         }
+                                     } ).toBlocking().lastOrDefault( null );
+
+
+        //perform the migration
+        graphShardVersionMigration.migrate( progressObserver );
+
+        assertEquals("Newly saved entities encounterd", 0, allEntities.size());
+        assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
+        assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
+
+
+        //write the status and version, then invalidate the cache so it appears
+        migrationInfoSerialization.setStatusCode( DataMigrationManagerImpl.StatusCode.COMPLETE.status );
+        migrationInfoSerialization.setVersion( graphShardVersionMigration.getVersion() );
+        dataMigrationManager.invalidate();
+
+        assertEquals("New version saved, and we should get new implementation", graphShardVersionMigration.getVersion(), dataMigrationManager.getCurrentVersion());
+
+
+        //now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
+        AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache )
+                                     .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+                                         @Override
+                                         public void call( final AllEntitiesInSystemObservable.EntityData entity ) {
+
+                                             final GraphManager gm =
+                                                     managerCache.getGraphManager( entity.applicationScope );
+
+                                             /**
+                                              * Get our edge types from the source
+                                              */
+                                             gm.getEdgeTypesFromSource(
+                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
+                                               .doOnNext( new Action1<String>() {
+                                                   @Override
+                                                   public void call( final String s ) {
+                                                       sourceTypes.remove( entity.entityId, s );
+                                                   }
+                                               } ).toBlocking().lastOrDefault( null );
+
+
+                                             /**
+                                              * Get the edge types to the target
+                                              */
+                                             gm.getEdgeTypesToTarget(
+                                                     new SimpleSearchEdgeType( entity.entityId, null, null ) )
+                                               .doOnNext( new Action1<String>() {
+                                                   @Override
+                                                   public void call( final String s ) {
+                                                       targetTypes.remove( entity.entityId, s );
+                                                   }
+                                               } ).toBlocking().lastOrDefault( null );
+                                         }
+                                     } ).toBlocking().lastOrDefault( null );
+
+        assertEquals( "All source types migrated", 0, sourceTypes.size() );
+        assertEquals( "All target types migrated", 0, targetTypes.size() );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
new file mode 100644
index 0000000..c7b69a1
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/TestProgressObserver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+
+
+public class TestProgressObserver implements DataMigration.ProgressObserver {
+
+    private boolean failed = false;
+
+
+    private List<String> updates = new ArrayList<>( 100 );
+
+
+    @Override
+    public void failed( final int migrationVersion, final String reason ) {
+        failed = true;
+    }
+
+
+    @Override
+    public void failed( final int migrationVersion, final String reason, final Throwable throwable ) {
+        failed = true;
+    }
+
+
+    @Override
+    public void update( final int migrationVersion, final String message ) {
+        updates.add( message );
+    }
+
+
+    /**
+     * Get if we failed
+     * @return
+     */
+    public boolean getFailed() {
+        return failed;
+    }
+
+
+    /**
+     * Get update messages
+     * @return
+     */
+    public List<String> getUpdates() {
+        return updates;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index c6a4572..423dc1f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
@@ -63,8 +64,8 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
         final String type2 = "type2thing";
         final int size = 10;
 
-        final Set<Id> type1Identities = createTypes( em, type1, size );
-        final Set<Id> type2Identities = createTypes( em, type2, size );
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( em, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( em, type2, size );
 
         //create a connection and put that in our connection types
         final Id source = type1Identities.iterator().next();
@@ -136,18 +137,4 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
         assertEquals( "Every connection should have been encountered", 0, connections.size() );
     }
 
-
-    private Set<Id> createTypes( final EntityManager em, final String type, final int size ) throws Exception {
-
-        final Set<Id> identities = new HashSet<>();
-
-        for ( int i = 0; i < size; i++ ) {
-            final Entity entity = em.create( type, new HashMap<String, Object>(){{put("property", "value");}} );
-            final Id createdId = new SimpleId( entity.getUuid(), entity.getType() );
-
-            identities.add( createdId );
-        }
-
-        return identities;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 4fa669c..2aa7fbc 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
@@ -70,7 +71,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
 
 
 
-        final Set<Id> sourceIdentities = createTypes( em, type2, size );
+        final Set<Id> sourceIdentities = EntityWriteHelper.createTypes( em, type2, size );
 
 
         final Entity entity = em.create( type1, new HashMap<String, Object>(){{put("property", "value");}} );
@@ -136,17 +137,4 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
     }
 
 
-    private Set<Id> createTypes( final EntityManager em, final String type, final int size ) throws Exception {
-
-        final Set<Id> identities = new HashSet<>();
-
-        for ( int i = 0; i < size; i++ ) {
-            final Entity entity = em.create( type, new HashMap<String, Object>(){{put("property", "value");}} );
-            final Id createdId = new SimpleId( entity.getUuid(), entity.getType() );
-
-            identities.add( createdId );
-        }
-
-        return identities;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index cd84e52..ef0d953 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
@@ -64,8 +65,8 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
         final String type2 = "type2things";
         final int size = 10;
 
-        final Set<Id> type1Identities = createTypes( em, type1, size );
-        final Set<Id> type2Identities = createTypes( em, type2, size );
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( em, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( em, type2, size );
 
         //create a connection and put that in our connection types
         final Id source = type1Identities.iterator().next();
@@ -146,17 +147,4 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
     }
 
 
-    private Set<Id> createTypes( final EntityManager em, final String type, final int size ) throws Exception {
-
-        final Set<Id> identities = new HashSet<>();
-
-        for ( int i = 0; i < size; i++ ) {
-            final Entity entity = em.create( type, new HashMap<String, Object>(){{put("property", "value");}} );
-            final Id createdId = new SimpleId( entity.getUuid(), entity.getType() );
-
-            identities.add( createdId );
-        }
-
-        return identities;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
index 77949b9..cde8866 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.corepersistence.CpSetup;
+import org.apache.usergrid.corepersistence.EntityWriteHelper;
 import org.apache.usergrid.corepersistence.ManagerCache;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.Entity;
@@ -65,8 +66,8 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
         final String type2 = "type2thing";
         final int size = 10;
 
-        final Set<Id> type1Identities = createTypes( em, type1, size );
-        final Set<Id> type2Identities = createTypes( em, type2, size );
+        final Set<Id> type1Identities = EntityWriteHelper.createTypes( em, type1, size );
+        final Set<Id> type2Identities = EntityWriteHelper.createTypes( em, type2, size );
 
         //create a connection and put that in our connection types
         final Id source = type1Identities.iterator().next();
@@ -127,17 +128,4 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
     }
 
 
-    private Set<Id> createTypes( final EntityManager em, final String type, final int size ) throws Exception {
-
-        final Set<Id> identities = new HashSet<>();
-
-        for ( int i = 0; i < size; i++ ) {
-            final Entity entity = em.create( type, new HashMap<String, Object>(){{put("property", "value");}} );
-            final Id createdId = new SimpleId( entity.getUuid(), entity.getType() );
-
-            identities.add( createdId );
-        }
-
-        return identities;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
index 3357ed4..e36c40b 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManager.java
@@ -47,6 +47,11 @@ public interface DataMigrationManager {
      */
     public int getCurrentVersion();
 
+    /**
+     * Invalidate the cache for versions
+     */
+    public void invalidate();
+
 
     /**
      * Return that last status of the migration

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index cf0d15d..9ab388f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -168,6 +168,12 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
 
 
     @Override
+    public void invalidate() {
+        versionCache.invalidateAll();
+    }
+
+
+    @Override
     public int getCurrentVersion() {
         try {
             return versionCache.get( "currentVersion" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
index 639f345..1b38d3c 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapSerializationImpl.java
@@ -75,7 +75,7 @@ public class MapSerializationImpl implements MapSerialization {
         /**
          * CFs where the row key contains the source node id
          */
-        private static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
+        public static final MultiTennantColumnFamily<ScopedRowKey<MapEntryKey>, Boolean>
             MAP_ENTRIES = new MultiTennantColumnFamily<>(
                 "Map_Entries", MAP_ENTRY_SERIALIZER, BOOLEAN_SERIALIZER );
 
@@ -83,7 +83,7 @@ public class MapSerializationImpl implements MapSerialization {
         /**
          * CFs where the row key contains the source node id
          */
-        private static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
+        public static final MultiTennantColumnFamily<BucketScopedRowKey<String>, String> MAP_KEYS =
                 new MultiTennantColumnFamily<>( "Map_Keys", MAP_KEY_SERIALIZER, STRING_SERIALIZER );
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 1a6adb6..88498b3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -57,6 +57,12 @@ public interface EntityIndex {
     public void refresh();
 
     /**
+     * Return the number of pending tasks in the cluster
+     * @return
+     */
+    public int getPendingTasks();
+
+    /**
      * Check health of cluster.
      */
     public Health getClusterHealth();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index a2c3090..b1e5374 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
+import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
@@ -408,6 +410,15 @@ public class EsEntityIndexImpl implements EntityIndex {
 
 
     @Override
+    public int getPendingTasks() {
+
+        final PendingClusterTasksResponse tasksResponse = esProvider.getClient().admin().cluster().pendingClusterTasks( new PendingClusterTasksRequest() ).actionGet();
+
+        return tasksResponse.pendingTasks().size();
+    }
+
+
+    @Override
     public CandidateResults getEntityVersions( final IndexScope scope, final Id id ) {
 
         //since we don't have paging inputs, there's no point in executing a query for paging.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index 18c3494..51a4aee 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -66,7 +66,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
     @Test
-    public void testBasicOperation() throws IOException {
+    public void testBasicOperation() throws IOException, InterruptedException {
 
         Id appId = new SimpleId( "application" );
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
@@ -137,6 +137,10 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.executeAndRefresh();
         personLikesIndex.refresh();
 
+
+        EsTestUtils.waitForTasks(personLikesIndex);
+        Thread.sleep( 1000 );
+
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex
                 .search( searchScope, SearchTypes.fromTypes( muffin.getId().getType() ), Query.fromQL( "select *" ) );
@@ -192,7 +196,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
 
 
     @Test
-    public void testDelete() throws IOException {
+    public void testDelete() throws IOException, InterruptedException {
 
         Id appId = new SimpleId( "application" );
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
@@ -263,6 +267,9 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.executeAndRefresh();
         personLikesIndex.refresh();
 
+        EsTestUtils.waitForTasks( personLikesIndex );
+        Thread.sleep( 1000 );
+
         // now, let's search for muffins
         CandidateResults likes = personLikesIndex.search( searchScope,
                 SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3775eb2d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
new file mode 100644
index 0000000..30f0ed0
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EsTestUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.index.impl;
+
+
+import org.apache.usergrid.persistence.index.EntityIndex;
+
+
+/**
+ * Utilities to make testing ES easier
+ */
+public class EsTestUtils {
+
+
+    /**
+     * Checks to see if we have pending tasks in the cluster.  If so waits until they are finished.  Adding
+     * new types can cause lag even after refresh since the type mapping needs applied
+     * @param index
+     */
+    public static void waitForTasks(final EntityIndex index){
+
+        while(index.getPendingTasks() > 0){
+            try {
+                Thread.sleep( 100 );
+            }
+            catch ( InterruptedException e ) {
+                //swallow
+            }
+        }
+    }
+}