You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/12/16 15:59:56 UTC

[26/50] incubator-usergrid git commit: Merge branch 'two-dot-o' into two-dot-o-events

Merge branch 'two-dot-o' into two-dot-o-events

Still need to update code to match new methods of custom fields and remove Query objects

Conflicts:
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
	stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
	stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
	stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
	stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/717eb15d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/717eb15d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/717eb15d

Branch: refs/heads/no-source-in-es
Commit: 717eb15d581c89fe5239c15733bae0588e7da25d
Parents: 1f71cf6 66ab376
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Nov 17 13:11:38 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Nov 17 13:11:38 2014 -0700

----------------------------------------------------------------------
 stack/awscluster/src/main/cql/create_locks.cql  |   26 -
 .../awscluster/src/main/cql/create_usergrid.cql |   28 -
 .../main/cql/create_usergrid_applications.cql   |   47 -
 stack/awscluster/src/main/cql/update_locks.cql  |   23 +
 .../awscluster/src/main/cql/update_usergrid.cql |   27 +
 .../main/cql/update_usergrid_applications.cql   |   46 +
 .../main/dist/init_instance/create_keyspaces.sh |   61 -
 .../main/dist/init_instance/init_db_server.sh   |    4 -
 .../main/dist/init_instance/init_rest_server.sh |   28 +-
 .../main/dist/init_instance/install_yourkit.sh  |    6 +-
 .../main/dist/init_instance/update_keyspaces.sh |   66 +
 .../src/main/groovy/NodeRegistry.groovy         |    8 +-
 .../src/main/groovy/configure_usergrid.groovy   |    2 +-
 stack/awscluster/ugcluster-cf.json              |  246 +++-
 .../main/resources/usergrid-default.properties  |    2 +-
 .../src/test/resources/usergrid-test.properties |    6 +-
 .../corepersistence/CpEntityManager.java        | 1292 ++++++++----------
 .../corepersistence/CpEntityManagerFactory.java |  134 +-
 .../corepersistence/CpManagerCache.java         |  132 +-
 .../corepersistence/CpRelationManager.java      |  542 ++++----
 .../usergrid/corepersistence/CpSetup.java       |   29 +-
 .../usergrid/corepersistence/CpWalker.java      |    9 +-
 .../usergrid/corepersistence/GuiceModule.java   |    2 +
 .../HybridEntityManagerFactory.java             |   15 +-
 .../usergrid/corepersistence/ManagerCache.java  |   69 +
 .../migration/EntityTypeMappingMigration.java   |  103 ++
 .../migration/GraphShardVersionMigration.java   |  125 +-
 .../corepersistence/migration/Versions.java     |    9 +-
 .../results/FilteringLoader.java                |   33 +-
 .../results/ResultsLoaderFactory.java           |    9 +-
 .../results/ResultsLoaderFactoryImpl.java       |   13 +-
 .../rx/AllEntitiesInSystemObservable.java       |   96 ++
 .../rx/ApplicationObservable.java               |  117 ++
 .../rx/EdgesFromSourceObservable.java           |   63 +
 .../rx/EdgesToTargetObservable.java             |   63 +
 .../corepersistence/rx/TargetIdObservable.java  |   66 +
 .../corepersistence/util/CpNamingUtils.java     |   78 +-
 .../usergrid/persistence/EntityManager.java     |    3 +
 .../persistence/EntityManagerFactory.java       |   12 +
 .../usergrid/persistence/SimpleEntityRef.java   |    6 +
 .../persistence/cassandra/CassandraService.java |    3 +
 .../cassandra/EntityManagerFactoryImpl.java     |   16 +-
 .../cassandra/RelationManagerImpl.java          |   18 +-
 .../apache/usergrid/ConcurrentCoreITSuite.java  |    2 +-
 .../usergrid/ConcurrentCoreIteratorITSuite.java |    4 +-
 .../org/apache/usergrid/CoreApplication.java    |    7 +-
 .../org/apache/usergrid/CoreITSetupImpl.java    |    1 -
 .../java/org/apache/usergrid/CoreITSuite.java   |    4 +-
 .../batch/job/AbstractSchedulerRuntimeIT.java   |    7 +-
 .../usergrid/batch/job/SchedulerRuntime1IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime2IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime3IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime4IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime5IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime6IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime7IT.java |    2 +
 .../usergrid/batch/job/SchedulerRuntime8IT.java |    2 +-
 .../corepersistence/EntityWriteHelper.java      |   59 +
 .../corepersistence/StaleIndexCleanupTest.java  |  182 +--
 .../migration/EntityTypeMappingMigrationIT.java |  158 +++
 .../migration/GraphShardVersionMigrationIT.java |  212 +++
 .../migration/TestProgressObserver.java         |   71 +
 .../rx/AllEntitiesInSystemObservableIT.java     |  140 ++
 .../rx/ApplicationObservableTestIT.java         |   82 ++
 .../rx/EdgesFromSourceObservableIT.java         |  140 ++
 .../rx/EdgesToTargetObservableIT.java           |  150 ++
 .../rx/TargetIdObservableTestIT.java            |  131 ++
 .../usergrid/persistence/CollectionIT.java      |    3 +-
 .../persistence/EntityConnectionsIT.java        |   43 +-
 .../org/apache/usergrid/persistence/GeoIT.java  |   34 +-
 .../PerformanceEntityRebuildIndexTest.java      |    6 +-
 .../apache/usergrid/persistence/QueryTest.java  |   15 +
 .../query/AbstractIteratingQueryIT.java         |   20 +-
 .../query/AllInConnectionNoTypeIT.java          |    2 +-
 .../resources/usergrid-custom-test.properties   |    4 +-
 .../migration/data/DataMigrationManager.java    |   11 +
 .../data/DataMigrationManagerImpl.java          |   26 +-
 .../data/MigrationInfoSerializationImpl.java    |    2 +-
 .../core/astyanax/ColumnNameIteratorTest.java   |   21 +-
 .../MultiKeyColumnNameIteratorTest.java         |   20 +-
 .../astyanax/MultiRowColumnIteratorTest.java    |   20 +-
 .../core/cassandra/CassandraRule.java           |  114 --
 .../data/MigrationInfoSerializationTest.java    |   13 +-
 .../persistence/core/test/ITRunner.java         |   20 +-
 .../EdgeMetadataSerializationProxyImpl.java     |    2 +-
 .../graph/CommittedGraphManagerIT.java          |    2 +-
 .../persistence/graph/GraphManagerLoadTest.java |    4 +-
 .../graph/GraphManagerShardConsistencyIT.java   |    6 -
 .../graph/GraphManagerShardingIT.java           |    4 +-
 .../graph/GraphManagerStressTest.java           |    4 +-
 .../graph/StorageGraphManagerIT.java            |    2 +-
 .../graph/guice/TestGraphModule.java            |    2 +-
 .../graph/impl/EdgeDeleteListenerTest.java      |    6 +-
 .../graph/impl/NodeDeleteListenerTest.java      |    6 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |    4 +-
 .../graph/impl/stage/EdgeMetaRepairTest.java    |    6 +-
 .../EdgeMetaDataSerializationProxyV2Test.java   |    7 +-
 .../EdgeMetaDataSerializationV1Test.java        |    4 +-
 .../EdgeMetaDataSerializationV2Test.java        |    4 +-
 .../EdgeSerializationChopTest.java              |    4 +-
 .../serialization/EdgeSerializationTest.java    |    7 +-
 .../serialization/NodeSerializationTest.java    |    4 +-
 .../PermanentSerializationTest.java             |    2 +-
 .../impl/shard/EdgeShardSerializationTest.java  |    9 +-
 .../NodeShardCounterSerializationTest.java      |    8 +-
 .../map/impl/MapSerializationImpl.java          |    4 +-
 stack/corepersistence/pom.xml                   |    2 +-
 .../usergrid/persistence/index/EntityIndex.java |   10 +-
 .../usergrid/persistence/index/SearchTypes.java |  130 ++
 .../index/impl/EsEntityIndexBatchImpl.java      |  138 +-
 .../index/impl/EsEntityIndexImpl.java           |  190 ++-
 .../persistence/index/impl/EsQueryVistor.java   |    3 +-
 .../persistence/index/impl/IndexScopeImpl.java  |   14 +-
 .../persistence/index/impl/IndexingUtils.java   |  107 +-
 .../usergrid/persistence/index/query/Query.java |   76 +-
 .../index/impl/CorePerformanceIT.java           |   40 +-
 .../index/impl/ElasticSearchResource.java       |  115 +-
 .../impl/EntityConnectionIndexImplTest.java     |  249 +++-
 .../persistence/index/impl/EntityIndexTest.java |  115 +-
 .../persistence/index/impl/EsTestUtils.java     |   48 +
 .../persistence/query/tree/GrammarTreeTest.java |    2 +-
 .../src/test/resources/dynamic-test.properties  |    2 +-
 .../src/test/resources/usergrid-UNIT.properties |    2 +-
 .../usergrid/launcher/EmbeddedServerHelper.java |    4 +-
 stack/loadtests/README.md                       |    6 +-
 .../datagenerators/EntityDataGenerator.scala    |    2 +-
 .../datagenerators/FeederGenerator.scala        |   56 +-
 .../usergrid/scenarios/DeviceScenarios.scala    |   34 +-
 .../scenarios/NotificationScenarios.scala       |   14 +-
 .../usergrid/scenarios/UserScenarios.scala      |   54 +-
 .../org/apache/usergrid/settings/Settings.scala |   17 +-
 .../usergrid/simulations/AppSimulation.scala    |    8 +-
 .../simulations/ConnectionsSimulation.scala     |    2 +-
 .../simulations/GetEntitySimulation.scala       |    2 +-
 .../PushNotificationTargetUserSimulation.scala  |    2 +-
 .../usergrid/simulations/SetupSimulation.scala  |   45 +
 stack/pom.xml                                   |    7 -
 .../apache/usergrid/rest/MigrateResource.java   |  165 +++
 .../apache/usergrid/rest/SystemResource.java    |   63 +-
 .../apache/usergrid/rest/AbstractRestIT.java    |   22 +-
 .../java/org/apache/usergrid/rest/BasicIT.java  |  124 +-
 .../java/org/apache/usergrid/rest/ITSetup.java  |   16 +-
 .../org/apache/usergrid/rest/RestITSuite.java   |    2 +-
 .../apache/usergrid/rest/TestContextSetup.java  |   10 +-
 .../rest/management/ExportResourceIT.java       |  333 +++--
 .../rest/management/ManagementResourceIT.java   |  143 +-
 .../rest/management/OrganizationsIT.java        |  327 +++--
 .../rest/management/RegistrationIT.java         |   61 +-
 .../rest/test/resource/OrgUserUUIDWrapper.java  |   29 +
 .../rest/test/resource/TestContext.java         |   21 +-
 .../rest/test/resource/TestOrganization.java    |   39 +
 .../resource/mgmt/OrganizationsCollection.java  |   28 +-
 .../usergrid/rest/test/security/TestUser.java   |    5 +
 .../resources/usergrid-custom-test.properties   |    4 +-
 .../org/apache/usergrid/ServiceApplication.java |   18 +-
 .../org/apache/usergrid/ServiceITSuite.java     |    2 +-
 .../usergrid/services/ConnectionsServiceIT.java |    4 +-
 .../usergrid/services/GroupServiceIT.java       |    4 +-
 .../usergrid/services/RolesServiceIT.java       |    2 +-
 .../usergrid/services/UsersServiceIT.java       |    2 +-
 .../AbstractServiceNotificationIT.java          |   12 +-
 .../notifications/NotifiersServiceIT.java       |    2 +-
 .../apns/NotificationsServiceIT.java            |  132 +-
 .../gcm/NotificationsServiceIT.java             |   28 +-
 .../resources/usergrid-custom-test.properties   |    4 +-
 stack/test-utils/pom.xml                        |    5 -
 .../usergrid/cassandra/CassandraResource.java   |  648 ++-------
 .../cassandra/CassandraResourceTest.java        |    7 +-
 .../resources/usergrid-custom-test.properties   |    2 +-
 169 files changed, 5757 insertions(+), 3527 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/717eb15d/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 2d0673f,2cb01d4..8723968
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@@ -2645,15 -2534,17 +2534,17 @@@ public class CpEntityManager implement
          }
  
          try {
-             logger.debug("About to Write {}:{} version {}", new Object[] { 
-                 cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
+             logger.debug( "About to Write {}:{} version {}", new Object[] {
+                     cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
+             } );
  
-              cpEntity = ecm.write( cpEntity ).toBlocking().last();
 -            cpEntity = ecm.write( cpEntity ).toBlocking().last();
++             cpEntity = ecm .write( cpEntity ).toBlocking().last();
  
-             logger.debug("Wrote {}:{} version {}", new Object[] { 
-                 cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
+             logger.debug( "Wrote {}:{} version {}", new Object[] {
+                     cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
+             } );
  
-             entityCache.put( new EntityScope( collectionScope, cpEntity.getId() ), cpEntity);
+             entityCache.put( new EntityScope( collectionScope, cpEntity.getId() ), cpEntity );
          }
          catch ( WriteUniqueVerifyException wuve ) {
              handleWriteUniqueVerifyException( entity, wuve );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/717eb15d/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 4268478,2d80672..2697958
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@@ -18,13 -19,9 +18,14 @@@ package org.apache.usergrid.corepersist
  import com.google.inject.AbstractModule;
  import com.google.inject.multibindings.Multibinder;
  
+ import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
  import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
 +import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
 +import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
 +import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
 +import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 +import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
 +import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
  import org.apache.usergrid.persistence.collection.guice.CollectionModule;
  import org.apache.usergrid.persistence.core.guice.CommonModule;
  import org.apache.usergrid.persistence.core.migration.data.DataMigration;
@@@ -53,19 -49,12 +54,20 @@@ public class GuiceModule extends Abstra
          install(new MapModule());
          install(new QueueModule());
  
 -        bind(CpEntityDeleteListener.class).asEagerSingleton();
 -        bind(CpEntityIndexDeleteListener.class).asEagerSingleton();
 -        bind(ManagerCache.class).to( CpManagerCache.class );
 +        Multibinder<EntityDeleted> entityBinder
 +                = Multibinder.newSetBinder(binder(), EntityDeleted.class);
 +        entityBinder.addBinding().to(EntityDeletedHandler.class);
 +
 +        Multibinder<EntityVersionDeleted> versionBinder
 +                = Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
 +        versionBinder.addBinding().to(EntityVersionDeletedHandler.class);
 +
 +        Multibinder<EntityVersionCreated> versionCreatedMultibinder
 +                = Multibinder.newSetBinder( binder(),EntityVersionCreated.class );
 +        versionCreatedMultibinder.addBinding().to(EntityVersionCreatedHandler.class);
  
          Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+         dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
          dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
  
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/717eb15d/stack/core/src/main/java/org/apache/usergrid/corepersistence/HybridEntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/717eb15d/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index dff2f59,fa9f9df..95d77d0
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@@ -51,38 -56,14 +57,29 @@@ import static org.apache.usergrid.persi
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
  
- import org.junit.Before;
- import org.junit.After;
- import org.junit.Test;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
  
 +//need to create system properties in test that can get applied
  /**
   * Test on read style clean-up of stale ElasticSearch indexes.
   */
  public class StaleIndexCleanupTest extends AbstractCoreIT {
-     private static final Logger logger = LoggerFactory.getLogger(StaleIndexCleanupTest.class );
+     private static final Logger logger = LoggerFactory.getLogger( StaleIndexCleanupTest.class );
  
-     // take it easy on embedded Cassandra
-     private static final long writeDelayMs = 50;
-     private static final long readDelayMs = 50;
  
 +    Lock sequential = new ReentrantLock();
 +
 +    @Before
 +    public void before() {
 +
 +        // if tests run in parallel there will likely be a conflict over the allow.stale.entities
 +        sequential.lock();
 +    }
 +
 +    @After
 +    public void after() {
 +        System.clearProperty( "allow.stale.entities" );
 +
 +    }
  
      /**
       * Test that updating an entity causes the entity's version number to change.
@@@ -206,11 -196,10 +212,11 @@@
  
                  //last entities appear first
                  final Entity expected = maxVersions.get( index );
 -                assertEquals( "correct entity returned", expected, returned );
 +                assertEquals("correct entity returned", expected, returned);
 +
              }
- 
-         } while ( cursor != null );
+         }
+         while ( cursor != null );
  
          assertEquals( "Expect no stale candidates", numEntities, thingCount );
  
@@@ -227,142 -214,6 +231,143 @@@
  
  
      /**
 +     * Test that the EntityDeleteImpl cleans up stale indexes on delete. Ensures that when an 
 +     * entity is deleted its old indexes are cleared from ElasticSearch.
 +     */
 +    @Test(timeout=10000)
 +    public void testCleanupOnDelete() throws Exception {
 +
 +        logger.info("Started testStaleIndexCleanup()");
 +
 +        // turn off post processing stuff that cleans up stale entities 
 +        System.setProperty( "allow.stale.entities", "true" );
 +
 +        final EntityManager em = app.getEntityManager();
 +
 +        final int numEntities = 10;
 +        final int numUpdates = 3;
 +
 +        // create lots of entities
 +        final List<Entity> things = new ArrayList<Entity>(numEntities);
 +        for ( int i=0; i<numEntities; i++) {
 +            final String thingName = "thing" + i;
 +            things.add( em.create("thing", new HashMap<String, Object>() {{
 +                put("name", thingName);
 +            }}));
 +            Thread.sleep( writeDelayMs );
 +        }
 +        em.refreshIndex();
 +
 +        CandidateResults crs = queryCollectionCp( "things", "select *");
 +        Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
 +
 +        // update each one a bunch of times
 +        int count = 0;
 +
 +        List<Entity> maxVersions = new ArrayList<>(numEntities);
 +
 +        for ( Entity thing : things ) {
 +            Entity toUpdate = null;
 +
 +            for ( int j=0; j<numUpdates; j++) {
 +                toUpdate = em.get( thing.getUuid() );
 +                toUpdate.setProperty( "property"  + j, RandomStringUtils.randomAlphanumeric(10));
 +
 +                em.update(toUpdate);
 +
 +                Thread.sleep( writeDelayMs );
 +                count++;
 +                if ( count % 100 == 0 ) {
 +                    logger.info("Updated {} of {} times", count, numEntities * numUpdates);
 +                }
 +            }
 +
 +            maxVersions.add( toUpdate );
 +        }
 +        em.refreshIndex();
 +
 +        // query Core Persistence directly for total number of result candidates
 +        crs = queryCollectionCp("things", "select *");
 +        Assert.assertEquals( "Expect stale candidates", numEntities * (numUpdates + 1), crs.size());
 +
 +        // delete all entities
 +        for ( Entity thing : things ) {
 +            em.delete( thing );
 +        }
 +        em.refreshIndex();
 +
 +        // wait for indexes to be cleared for the deleted entities
 +        count = 0;
 +        do {
 +            Thread.sleep(100);
 +            crs = queryCollectionCp("things", "select *");
 +        } while ( crs.size() > 0 && count++ < 14 );
 +
 +        Assert.assertEquals( "Expect no candidates", 0, crs.size() );
 +    }
 +
 +    
 +    /**
 +     * Test that the EntityDeleteImpl cleans up stale indexes on update. Ensures that when an 
 +     * entity is updated its old indexes are cleared from ElasticSearch.
 +     */
 +    @Test(timeout=10000)
 +    public void testCleanupOnUpdate() throws Exception {
 +
 +        logger.info( "Started testCleanupOnUpdate()" );
 +
 +        final EntityManager em = app.getEntityManager();
 +
 +        final int numEntities = 10;
 +        final int numUpdates = 3;
 +
 +        // create lots of entities
 +        final List<Entity> things = new ArrayList<Entity>(numEntities);
 +        for ( int i=0; i<numEntities; i++) {
 +            final String thingName = "thing" + i;
 +            things.add( em.create("thing", new HashMap<String, Object>() {{
 +                put("name", thingName);
 +            }}));
 +            Thread.sleep( writeDelayMs );
 +        }
 +        em.refreshIndex();
 +
 +        CandidateResults crs = queryCollectionCp( "things", "select *");
 +        Assert.assertEquals( "Expect no stale candidates yet", numEntities, crs.size() );
 +
 +        // update each one a bunch of times
 +        int count = 0;
 +
 +        List<Entity> maxVersions = new ArrayList<>(numEntities);
 +
 +        for ( Entity thing : things ) {
 +            Entity toUpdate = null;
 +
 +            for ( int j=0; j<numUpdates; j++) {
 +                toUpdate = em.get( thing.getUuid() );
 +                toUpdate.setProperty( "property"  + j, RandomStringUtils.randomAlphanumeric(10));
 +
 +                em.update(toUpdate);
 +
 +                Thread.sleep( writeDelayMs );
 +                count++;
 +                if ( count % 100 == 0 ) {
 +                    logger.info("Updated {} of {} times", count, numEntities * numUpdates);
 +                }
 +            }
 +
 +            maxVersions.add( toUpdate );
 +        }
 +        em.refreshIndex();
 +
 +        // query Core Persistence directly for total number of result candidates
 +        crs = queryCollectionCp("things", "select *");
 +        Assert.assertEquals( "Expect candidates without earlier stale entities", numEntities, crs.size() );
 +    }
 +
 +    
 +    /** 
++    /**
       * Go around EntityManager and get directly from Core Persistence.
       */
      private org.apache.usergrid.persistence.model.entity.Entity getCpEntity( EntityRef eref ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/717eb15d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index b1e3b33,c4cdeeb..f69a64c
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@@ -258,9 -213,10 +251,13 @@@ public class EsEntityIndexBatchImpl imp
  
          try {
              responses = request.execute().actionGet();
 +        } catch (Throwable t) {
 +            logger.error("Unable to communicate with elasticsearch");
 +            failureMonitor.fail("Unable to execute batch", t);
+         }
+         catch ( Throwable t ) {
+             log.error( "Unable to communicate with elasticsearch" );
+             failureMonitor.fail( "Unable to execute batch", t );
              throw t;
          }
  
@@@ -318,8 -293,8 +334,6 @@@
  
                  if ( !list.isEmpty() ) {
                      if ( list.get( 0 ) instanceof String ) {
--                        Joiner joiner = Joiner.on( " " ).skipNulls();
--                        String joined = joiner.join( list );
                          entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(),
                                  new ArrayList( processCollectionForMap( list ) ) );
                      }