You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/02 06:47:40 UTC

[1/2] git commit: Initial hack to add batch operations to ES. Needs some serious TLC when we have a bit of time.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/esbatching 1e7da5c8e -> bba08ddc1


Initial hack to add batch operations to ES.  Needs some serious TLC when we have a bit of time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fba71c2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fba71c2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fba71c2d

Branch: refs/heads/esbatching
Commit: fba71c2d71df58e1bc8dcf2b11f5a3eeb71b052e
Parents: 1e7da5c
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 20:18:26 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 20:18:26 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/EntityIndex.java |  38 +-
 .../persistence/index/EntityIndexBatch.java     |  71 +++
 .../persistence/index/EntityIndexFactory.java   |   4 +-
 .../index/impl/EsEntityIndexBatchImpl.java      | 359 +++++++++++
 .../index/impl/EsEntityIndexImpl.java           | 633 ++++---------------
 .../persistence/index/impl/EsQueryVistor.java   |  35 +-
 .../persistence/index/impl/IndexingUtils.java   | 112 ++++
 .../index/impl/CorePerformanceIT.java           |  41 +-
 .../impl/EntityConnectionIndexImplTest.java     |   9 +-
 .../persistence/index/impl/EntityIndexTest.java | 110 ++--
 10 files changed, 791 insertions(+), 621 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 2bf1e89..64a3e10 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -19,56 +19,38 @@
 
 package org.apache.usergrid.persistence.index;
 
-import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.model.entity.Id;
 
-import java.util.UUID;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-
 
 /**
  * Provides indexing of Entities within a scope.
  */
 public interface EntityIndex {
 
-    /** 
-     * Create index for Entity
-     * @param entity Entity to be indexed.
-     */
-    public void index(  Entity entity );
-
     /**
-     * Remove index of entity.
-     * @param entity Entity to be removed from index. 
+     * Create the index batch
+     * @return
      */
-    public void deindex( Entity entity );
+    public EntityIndexBatch createBatch();
 
     /**
-     * Remove index of entity.
-     * @param result CandidateResult to be removed from index.
+     * Execute query in Usergrid syntax.
      */
-    public void deindex( CandidateResult result );
 
-    /**
-     * Remove index of entity.
-     * @param id Id to be removed from index.
-     * @param version Version to be removed from index.
-     */
-    public void deindex( Id id, UUID version);
+    public CandidateResults search(final IndexScope indexScope,  Query query );
 
     /**
-     * Execute query in Usergrid syntax.
+     * Get the candidate results of all versions of the entity for this id
+     * @param id
+     * @return
      */
-
-    public CandidateResults search( Query query );
+    public CandidateResults getEntityVersions(final IndexScope indexScope, Id id);
 
     /**
-     * Force refresh of index (should be used for testing purposes only).
+     * Refresh the index
      */
     public void refresh();
 
-    public CandidateResults getEntityVersions(Id id);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
new file mode 100644
index 0000000..643174c
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -0,0 +1,71 @@
+package org.apache.usergrid.persistence.index;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public interface EntityIndexBatch {
+
+
+
+
+    /**
+     * Create index for Entity
+     * @param indexScope The scope for the index
+     * @param entity Entity to be indexed.
+     */
+    public EntityIndexBatch index( final IndexScope indexScope, final Entity entity );
+
+    /**
+     * Remove index of entity
+     * @param scope The scope for the entity
+     * @param entity Entity to be removed from index.
+     */
+    public EntityIndexBatch deindex(final IndexScope scope, final Entity entity );
+
+    /**
+     * Remove index of entity.
+     * @param scope The scope to use for removal
+     * @param result CandidateResult to be removed from index.
+     */
+    public EntityIndexBatch deindex(final IndexScope scope, final CandidateResult result );
+
+    /**
+     * Remove index of entity.
+     * @param scope The scope to remove
+     * @param id Id to be removed from index.
+     * @param version Version to be removed from index.
+     */
+    public EntityIndexBatch deindex(final IndexScope scope, final Id id, final UUID version);
+
+    /**
+     * Execute the batch
+     */
+    public void execute();
+
+    /**
+     * Execute the batch and force the refresh
+     */
+    public void executeAndRefresh();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
index 57ae6a5..1a97b5a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
@@ -20,11 +20,13 @@ package org.apache.usergrid.persistence.index;
 
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
 import com.google.inject.assistedinject.Assisted;
 
 
 public interface EntityIndexFactory {
 
     public EntityIndex createEntityIndex( 
-        @Assisted IndexScope appScope);
+        @Assisted ApplicationScope appScope);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
new file mode 100644
index 0000000..d9857f2
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -0,0 +1,359 @@
+package org.apache.usergrid.persistence.index.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.indices.IndexAlreadyExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.BooleanField;
+import org.apache.usergrid.persistence.model.field.DoubleField;
+import org.apache.usergrid.persistence.model.field.EntityObjectField;
+import org.apache.usergrid.persistence.model.field.Field;
+import org.apache.usergrid.persistence.model.field.FloatField;
+import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.ListField;
+import org.apache.usergrid.persistence.model.field.LocationField;
+import org.apache.usergrid.persistence.model.field.LongField;
+import org.apache.usergrid.persistence.model.field.SetField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.field.UUIDField;
+import org.apache.usergrid.persistence.model.field.value.EntityObject;
+
+import com.google.common.base.Joiner;
+
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITYID_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.GEO_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createCollectionScopeTypeName;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexName;
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+
+public class EsEntityIndexBatchImpl implements EntityIndexBatch {
+
+    private static final Logger log = LoggerFactory.getLogger( EsEntityIndexBatchImpl.class );
+
+    private final ApplicationScope applicationScope;
+
+    private final Client client;
+
+    // Keep track of what types we have already initialized to avoid cost
+    // of attempting to init them again. Used in the initType() method.
+    private final Set<String> knownTypes;
+
+    private final boolean refresh;
+
+    private final String indexName;
+
+    private BulkRequestBuilder bulkRequest;
+
+
+    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope,
+                                   final Client client, final IndexFig config, final Set<String> knownTypes ) {
+
+        this.applicationScope = applicationScope;
+        this.client = client;
+        this.knownTypes = knownTypes;
+        this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
+        this.refresh = config.isForcedRefresh();
+        initBatch();
+    }
+
+
+    @Override
+    public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
+
+        final String indexType = createCollectionScopeTypeName( indexScope );
+
+        if ( log.isDebugEnabled() ) {
+            log.debug( "Indexing entity {}:{} in scope\n   app {}\n   owner {}\n   name {}\n   type {}", new Object[] {
+                    entity.getId().getType(), entity.getId().getUuid(), indexScope.getApplication(),
+                    indexScope.getOwner(), indexScope.getName(), indexType
+            } );
+        }
+
+        ValidationUtils.verifyEntityWrite( entity );
+
+        initType(indexScope,  indexType );
+
+        Map<String, Object> entityAsMap = entityToMap( entity );
+
+        // need prefix here becuase we index UUIDs as strings
+        entityAsMap.put( STRING_PREFIX + ENTITYID_FIELDNAME, entity.getId().getUuid().toString().toLowerCase() );
+
+        // let caller add these fields if needed
+        // entityAsMap.put("created", entity.getId().getUuid().timestamp();
+        // entityAsMap.put("updated", entity.getVersion().timestamp());
+
+        String indexId = createIndexDocId( entity );
+
+        log.debug( "Indexing entity id {} data {} ", indexId, entityAsMap );
+
+        bulkRequest.add( client.prepareIndex( indexName, indexType, indexId ).setSource( entityAsMap ) );
+
+        return this;
+    }
+
+
+    @Override
+    public EntityIndexBatch deindex(final IndexScope indexScope, final Id id, final UUID version ) {
+
+        final String indexType = createCollectionScopeTypeName( indexScope );
+
+        if ( log.isDebugEnabled() ) {
+            log.debug( "De-indexing entity {}:{} in scope\n   app {}\n   owner {}\n   name {} type {}", new Object[] {
+                    id.getType(), id.getUuid(), indexScope.getApplication(), indexScope.getOwner(),
+                    indexScope.getName(), indexType
+            } );
+        }
+
+        String indexId = createIndexDocId( id, version );
+
+        bulkRequest.add( client.prepareDelete( indexName, indexType, indexId ).setRefresh( refresh ) );
+
+
+        log.debug( "Deindexed Entity with index id " + indexId );
+
+        return this;
+    }
+
+
+    @Override
+    public EntityIndexBatch deindex( final IndexScope indexScope, final Entity entity ) {
+
+       return  deindex(indexScope,  entity.getId(), entity.getVersion() );
+    }
+
+
+    @Override
+    public EntityIndexBatch deindex( final IndexScope indexScope, final CandidateResult entity ) {
+
+        return deindex(indexScope,  entity.getId(), entity.getVersion() );
+    }
+
+
+    @Override
+    public void execute() {
+        execute( bulkRequest.setRefresh( refresh ) );
+    }
+
+
+    /**
+     * Execute the request, check for errors, then re-init the batch for future use
+     * @param request
+     */
+    private void execute( final BulkRequestBuilder request ) {
+        final BulkResponse response = request.execute().actionGet();
+
+        if ( response.hasFailures() ) {
+            throw new RuntimeException( "Unable to index documents.  Errors are :" + response.buildFailureMessage() );
+        }
+
+        initBatch();
+    }
+
+
+    @Override
+    public void executeAndRefresh() {
+        execute(bulkRequest.setRefresh( true ) );
+    }
+
+
+    /**
+     * Create ElasticSearch mapping for each type of Entity.
+     */
+    private void initType( final IndexScope indexScope, String typeName ) {
+
+        // no need for synchronization here, it's OK if we init attempt to init type multiple times
+        if ( knownTypes.contains( typeName ) ) {
+            return;
+        }
+
+        AdminClient admin = client.admin();
+        try {
+            XContentBuilder mxcb = EsEntityIndexImpl.createDoubleStringIndexMapping( jsonBuilder(), typeName );
+
+            admin.indices().preparePutMapping( indexName ).setType( typeName ).setSource( mxcb ).execute().actionGet();
+
+            admin.indices().prepareGetMappings( indexName ).addTypes( typeName ).execute().actionGet();
+
+            //            log.debug("Created new type mapping");
+            //            log.debug("   Scope application: " + indexScope.getApplication());
+            //            log.debug("   Scope owner: " + indexScope.getOwner());
+            //            log.debug("   Type name: " + typeName);
+
+            knownTypes.add( typeName );
+        }
+        catch ( IndexAlreadyExistsException ignored ) {
+            // expected
+        }
+        catch ( IOException ex ) {
+            throw new RuntimeException(
+                    "Exception initing type " + typeName + " in app " + indexScope.getApplication().toString() );
+        }
+    }
+
+
+    /**
+     * Convert Entity to Map. Adding prefixes for types:
+     *
+     * su_ - String unanalyzed field sa_ - String analyzed field go_ - Location field nu_ - Number field bu_ - Boolean
+     * field
+     */
+    private static Map entityToMap( EntityObject entity ) {
+
+        Map<String, Object> entityMap = new HashMap<String, Object>();
+
+        for ( Object f : entity.getFields().toArray() ) {
+
+            Field field = ( Field ) f;
+
+            if ( f instanceof ListField ) {
+                List list = ( List ) field.getValue();
+                entityMap.put( field.getName().toLowerCase(), new ArrayList( processCollectionForMap( list ) ) );
+
+                if ( !list.isEmpty() ) {
+                    if ( list.get( 0 ) instanceof String ) {
+                        Joiner joiner = Joiner.on( " " ).skipNulls();
+                        String joined = joiner.join( list );
+                        entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(),
+                                new ArrayList( processCollectionForMap( list ) ) );
+                    }
+                }
+            }
+            else if ( f instanceof ArrayField ) {
+                List list = ( List ) field.getValue();
+                entityMap.put( field.getName().toLowerCase(), new ArrayList( processCollectionForMap( list ) ) );
+            }
+            else if ( f instanceof SetField ) {
+                Set set = ( Set ) field.getValue();
+                entityMap.put( field.getName().toLowerCase(), new ArrayList( processCollectionForMap( set ) ) );
+            }
+            else if ( f instanceof EntityObjectField ) {
+                EntityObject eo = ( EntityObject ) field.getValue();
+                entityMap.put( field.getName().toLowerCase(), entityToMap( eo ) ); // recursion
+
+                // Add type information as field-name prefixes
+
+            }
+            else if ( f instanceof StringField ) {
+
+                // index in lower case because Usergrid queries are case insensitive
+                entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(),
+                        ( ( String ) field.getValue() ).toLowerCase() );
+                entityMap.put( STRING_PREFIX + field.getName().toLowerCase(),
+                        ( ( String ) field.getValue() ).toLowerCase() );
+            }
+            else if ( f instanceof LocationField ) {
+                LocationField locField = ( LocationField ) f;
+                Map<String, Object> locMap = new HashMap<String, Object>();
+
+                // field names lat and lon trigger ElasticSearch geo location
+                locMap.put( "lat", locField.getValue().getLatitude() );
+                locMap.put( "lon", locField.getValue().getLongtitude() );
+                entityMap.put( GEO_PREFIX + field.getName().toLowerCase(), locMap );
+            }
+            else if ( f instanceof DoubleField || f instanceof FloatField || f instanceof IntegerField
+                    || f instanceof LongField ) {
+
+                entityMap.put( NUMBER_PREFIX + field.getName().toLowerCase(), field.getValue() );
+            }
+            else if ( f instanceof BooleanField ) {
+
+                entityMap.put( BOOLEAN_PREFIX + field.getName().toLowerCase(), field.getValue() );
+            }
+            else if ( f instanceof UUIDField ) {
+
+                entityMap.put( STRING_PREFIX + field.getName().toLowerCase(), field.getValue().toString() );
+            }
+            else {
+                entityMap.put( field.getName().toLowerCase(), field.getValue() );
+            }
+        }
+
+        return entityMap;
+    }
+
+
+    private static Collection processCollectionForMap( Collection c ) {
+        if ( c.isEmpty() ) {
+            return c;
+        }
+        List processed = new ArrayList();
+        Object sample = c.iterator().next();
+
+        if ( sample instanceof Entity ) {
+            for ( Object o : c.toArray() ) {
+                Entity e = ( Entity ) o;
+                processed.add( entityToMap( e ) );
+            }
+        }
+        else if ( sample instanceof List ) {
+            for ( Object o : c.toArray() ) {
+                List list = ( List ) o;
+                processed.add( processCollectionForMap( list ) ); // recursion;
+            }
+        }
+        else if ( sample instanceof Set ) {
+            for ( Object o : c.toArray() ) {
+                Set set = ( Set ) o;
+                processed.add( processCollectionForMap( set ) ); // recursion;
+            }
+        }
+        else {
+            for ( Object o : c.toArray() ) {
+                processed.add( o );
+            }
+        }
+        return processed;
+    }
+
+
+    private void initBatch() {
+        this.bulkRequest = client.prepareBulk();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 8401e13..077036a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -18,55 +18,20 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import com.google.common.base.Joiner;
-import com.google.common.util.concurrent.AtomicDouble;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.commons.lang3.time.StopWatch;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
-import org.apache.usergrid.persistence.model.field.ArrayField;
-import org.apache.usergrid.persistence.model.field.BooleanField;
-import org.apache.usergrid.persistence.model.field.DoubleField;
-import org.apache.usergrid.persistence.model.field.EntityObjectField;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.apache.usergrid.persistence.model.field.FloatField;
-import org.apache.usergrid.persistence.model.field.IntegerField;
-import org.apache.usergrid.persistence.model.field.ListField;
-import org.apache.usergrid.persistence.model.field.LocationField;
-import org.apache.usergrid.persistence.model.field.LongField;
-import org.apache.usergrid.persistence.model.field.SetField;
-import org.apache.usergrid.persistence.model.field.StringField;
-import org.apache.usergrid.persistence.model.field.UUIDField;
-import org.apache.usergrid.persistence.model.field.value.EntityObject;
+
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchScrollRequestBuilder;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
@@ -78,19 +43,42 @@ import org.elasticsearch.search.sort.SortOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.DOC_ID_SEPARATOR_SPLITTER;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ENTITYID_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.GEO_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createCollectionScopeTypeName;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexName;
+
 
 /**
  * Implements index using ElasticSearch Java API.
  */
 public class EsEntityIndexImpl implements EntityIndex {
 
-    private static final Logger log = LoggerFactory.getLogger(EsEntityIndexImpl.class);
+    private static final Logger log = LoggerFactory.getLogger( EsEntityIndexImpl.class );
 
     private final String indexName;
 
-    private final String indexType;
-
-    private final IndexScope indexScope;
+    private final ApplicationScope applicationScope;
 
     private final Client client;
 
@@ -98,316 +86,94 @@ public class EsEntityIndexImpl implements EntityIndex {
     // of attempting to init them again. Used in the initType() method.
     private Set<String> knownTypes = new TreeSet<String>();
 
-    private final boolean refresh;
     private final int cursorTimeout;
 
-    private final AtomicLong indexedCount = new AtomicLong(0L);
-    private final AtomicDouble averageIndexTime = new AtomicDouble(0);
-
-    public static final String STRING_PREFIX = "su_";
-    public static final String ANALYZED_STRING_PREFIX = "sa_";
-    public static final String GEO_PREFIX = "go_";
-    public static final String NUMBER_PREFIX = "nu_";
-    public static final String BOOLEAN_PREFIX = "bu_";
 
-    public static final String ENTITYID_FIELDNAME = "zzz_entityid_zzz";
+    private final IndexFig config;
 
-    public static final String DOC_ID_SEPARATOR = "|";
-    public static final String DOC_ID_SEPARATOR_SPLITTER = "\\|";
 
-    // These are not allowed in document type names: _ . , | #
-    public static final String DOC_TYPE_SEPARATOR = "^";
-    public static final String DOC_TYPE_SEPARATOR_SPLITTER = "\\^";
-
-    public static final String INDEX_NAME_SEPARATOR = "^";
-
-       
     @Inject
-    public EsEntityIndexImpl(
-            @Assisted final IndexScope indexScope,
-            IndexFig config,
-            EsProvider provider
-    ) {
+    public EsEntityIndexImpl( @Assisted final ApplicationScope applicationScope, final IndexFig config, final EsProvider provider ) {
 
-        IndexValidationUtils.validateIndexScope( indexScope );
+        ValidationUtils.validateApplicationScope( applicationScope );
 
         try {
-            this.indexScope = indexScope;
+            this.applicationScope = applicationScope;
 
             this.client = provider.getClient();
 
-            this.indexName = createIndexName( config.getIndexPrefix(), indexScope);
-            this.indexType = createCollectionScopeTypeName( indexScope );
-
-            this.refresh = config.isForcedRefresh();
+            this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
             this.cursorTimeout = config.getQueryCursorTimeout();
 
-        } catch ( Exception e ) {
-            log.error("Error setting up index", e);
+            this.config = config;
+        }
+        catch ( Exception e ) {
+            log.error( "Error setting up index", e );
             throw e;
         }
 
         AdminClient admin = client.admin();
         try {
-            CreateIndexResponse r = admin.indices().prepareCreate(indexName).execute().actionGet();
-            log.debug("Created new Index Name [{}] ACK=[{}]", indexName, r.isAcknowledged());
+            CreateIndexResponse r = admin.indices().prepareCreate( indexName ).execute().actionGet();
+            log.debug( "Created new Index Name [{}] ACK=[{}]", indexName, r.isAcknowledged() );
 
             client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
 
             try {
                 // TODO: figure out what refresh above is not enough to ensure index is ready
-                Thread.sleep(500);
-            } catch (InterruptedException ex) {}
-
-        } catch (IndexAlreadyExistsException ignored) {
-            // expected
-        }
-    }
-
-  
-    /**
-     * Create ElasticSearch mapping for each type of Entity.
-     */
-    private void initType( String typeName ) {
-
-        // no need for synchronization here, it's OK if we init attempt to init type multiple times
-        if ( knownTypes.contains( typeName )) {
-            return;
-        }
-
-        AdminClient admin = client.admin();
-        try {
-            XContentBuilder mxcb = EsEntityIndexImpl
-                .createDoubleStringIndexMapping(jsonBuilder(), typeName);
-
-            admin.indices().preparePutMapping(indexName)
-                .setType(typeName).setSource(mxcb).execute().actionGet();
-
-            admin.indices().prepareGetMappings(indexName)
-                .addTypes(typeName).execute().actionGet();
-
-//            log.debug("Created new type mapping");
-//            log.debug("   Scope application: " + indexScope.getApplication());
-//            log.debug("   Scope owner: " + indexScope.getOwner());
-//            log.debug("   Type name: " + typeName);
-
-            knownTypes.add( typeName );
-
-        } catch (IndexAlreadyExistsException ignored) {
-            // expected
-        } 
-        catch (IOException ex) {
-            throw new RuntimeException("Exception initing type " + typeName 
-                + " in app " + indexScope.getApplication().toString());
-        }
-    }
-
-
-    /**
-     * Create the index name based on our prefix+appUUID+AppType
-     * @param prefix
-     * @param indexScope
-     * @return
-     */
-    private String createIndexName( 
-            String prefix, IndexScope indexScope) {
-        StringBuilder sb = new StringBuilder();
-        String sep = INDEX_NAME_SEPARATOR;
-        sb.append( prefix ).append(sep);
-        sb.append( indexScope.getApplication().getUuid() ).append(sep);
-        sb.append( indexScope.getApplication().getType() );
-        return sb.toString();
-    }
-
-
-    /**
-     * Create the index doc from the given entity
-     * @param entity
-     * @return
-     */
-    private String createIndexDocId(Entity entity) {
-        return createIndexDocId(entity.getId(), entity.getVersion());
-    }
-
-
-    /**
-     * Create the doc Id. This is the entitie's type + uuid + version
-     * @param entityId
-     * @param version
-     * @return
-     */
-    private String createIndexDocId(Id entityId, UUID version) {
-        StringBuilder sb = new StringBuilder();
-        String sep = DOC_ID_SEPARATOR;
-        sb.append( entityId.getUuid() ).append(sep);
-        sb.append( entityId.getType() ).append(sep);
-        sb.append( version.toString() );
-        return sb.toString();
-    }
-
-
-    /**
-     * Create our sub scope.  This is the ownerUUID + type
-     * @param scope
-     * @return
-     */
-    private static String createCollectionScopeTypeName( IndexScope scope ) {
-        StringBuilder sb = new StringBuilder();
-        String sep = DOC_TYPE_SEPARATOR;
-        sb.append( scope.getApplication().getUuid() ).append(sep);
-        sb.append( scope.getApplication().getType() ).append(sep);
-        sb.append( scope.getOwner().getUuid() ).append(sep);
-        sb.append( scope.getOwner().getType() ).append(sep);
-        sb.append( scope.getName() );
-        return sb.toString();
-    }
-
-
-    
-    @Override
-    public void index( Entity entity ) {
-
-        if ( log.isDebugEnabled() ) {
-            log.debug("Indexing entity {}:{} in scope\n   app {}\n   owner {}\n   name {}\n   type {}", 
-                new Object[] { 
-                    entity.getId().getType(), 
-                    entity.getId().getUuid(), 
-                    indexScope.getApplication(), 
-                    indexScope.getOwner(), 
-                    indexScope.getName(),
-                    indexType
-            });
-        }
-
-        ValidationUtils.verifyEntityWrite(entity);
-
-        initType( indexType );
-
-        StopWatch timer = null;
-        if ( log.isDebugEnabled() ) {
-            timer = new StopWatch();
-            timer.start();
-        }
-
-        Map<String, Object> entityAsMap = EsEntityIndexImpl.entityToMap(entity);
-
-        // need prefix here becuase we index UUIDs as strings
-        entityAsMap.put( STRING_PREFIX + ENTITYID_FIELDNAME, 
-            entity.getId().getUuid().toString().toLowerCase());
-
-        // let caller add these fields if needed
-        // entityAsMap.put("created", entity.getId().getUuid().timestamp();
-        // entityAsMap.put("updated", entity.getVersion().timestamp());
-
-        String indexId = EsEntityIndexImpl.this.createIndexDocId(entity);
-
-        log.debug("Indexing entity id {} data {} ", indexId, entityAsMap);
-
-        IndexRequestBuilder irb = client
-            .prepareIndex( indexName, this.indexType, indexId)
-            .setSource(entityAsMap)
-            .setRefresh(refresh);
-
-        irb.execute().actionGet();
-
-        if ( refresh) {
-            refresh();
-        }
-
-        //log.debug("Indexed Entity with index id " + indexId);
-
-        if ( log.isDebugEnabled() ) {
-            timer.stop();
-            double average = averageIndexTime.get();
-            if ( !averageIndexTime.compareAndSet( 0, timer.getTime() ) ) {
-                averageIndexTime.compareAndSet( average, (average + timer.getTime()) / 2.0 );
+                Thread.sleep( 500 );
             }
-            long count = indexedCount.addAndGet(1);
-            if ( count % 1000 == 0 ) {
-                log.debug("Indexed {} entities, average time {}ms", 
-                        count, averageIndexTime.get() );
+            catch ( InterruptedException ex ) {
             }
         }
-    }
-
-    @Override
-    public void deindex( final Id id, final UUID version) {
-
-        if ( log.isDebugEnabled() ) {
-            log.debug("De-indexing entity {}:{} in scope\n   app {}\n   owner {}\n   name {} type {}", 
-                new Object[] { 
-                    id.getType(), 
-                    id.getUuid(), 
-                    indexScope.getApplication(), 
-                    indexScope.getOwner(), 
-                    indexScope.getName(),
-                    indexType
-            });
-        }
-
-        String indexId = createIndexDocId( id, version ); 
-        client.prepareDelete( indexName, indexType, indexId )
-            .setRefresh( refresh )
-            .execute().actionGet();
-
-        if ( refresh ) {
-            refresh();
+        catch ( IndexAlreadyExistsException ignored ) {
+            // expected
         }
-
-        log.debug("Deindexed Entity with index id " + indexId);
     }
-    @Override
-    public void deindex( Entity entity ) {
 
-        deindex( entity.getId(), entity.getVersion() );
-    }
 
     @Override
-    public void deindex( CandidateResult entity ) {
-
-        deindex( entity.getId(), entity.getVersion() );
-
+    public EntityIndexBatch createBatch() {
+        return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes );
     }
 
 
     @Override
-    public CandidateResults search(Query query) {
+    public CandidateResults search( final IndexScope indexScope, final Query query ) {
+
+        final String indexType = createCollectionScopeTypeName( indexScope );
 
         QueryBuilder qb = query.createQueryBuilder();
 
         if ( log.isDebugEnabled() ) {
-            log.debug("Searching index {}\n   type {}\n   query {} limit {}", 
-                new Object[] { 
-                    this.indexName,
-                    this.indexType,
-                    qb.toString().replace("\n", " "),
-                    query.getLimit()
-            });
+            log.debug( "Searching index {}\n   type {}\n   query {} limit {}", new Object[] {
+                            this.indexName, indexType, qb.toString().replace( "\n", " " ), query.getLimit()
+                    } );
         }
-            
+
         SearchResponse searchResponse;
-        if (query.getCursor() == null) {
+        if ( query.getCursor() == null ) {
 
 
-            SearchRequestBuilder srb = client.prepareSearch(indexName)
-                .setTypes( indexType )
-                .setScroll( cursorTimeout + "m" )
-                .setQuery( qb );
+            SearchRequestBuilder srb =
+                    client.prepareSearch( indexName ).setTypes( indexType ).setScroll( cursorTimeout + "m" )
+                          .setQuery( qb );
 
             FilterBuilder fb = query.createFilterBuilder();
-            if (fb != null) {
-                log.debug("   Filter: {} ", fb.toString());
-                srb = srb.setPostFilter(fb);
+            if ( fb != null ) {
+                log.debug( "   Filter: {} ", fb.toString() );
+                srb = srb.setPostFilter( fb );
             }
 
-            srb = srb.setFrom(0).setSize(query.getLimit());
+            srb = srb.setFrom( 0 ).setSize( query.getLimit() );
 
-            for (Query.SortPredicate sp : query.getSortPredicates()) {
+            for ( Query.SortPredicate sp : query.getSortPredicates() ) {
 
                 final SortOrder order;
-                if (sp.getDirection().equals(Query.SortDirection.ASCENDING)) {
+                if ( sp.getDirection().equals( Query.SortDirection.ASCENDING ) ) {
                     order = SortOrder.ASC;
-                } else {
+                }
+                else {
                     order = SortOrder.DESC;
                 }
 
@@ -416,272 +182,117 @@ public class EsEntityIndexImpl implements EntityIndex {
                 // that you can order by: string, number and boolean and we ask ElasticSearch 
                 // to ignore any fields that are not present.
 
-                final String stringFieldName = STRING_PREFIX + sp.getPropertyName(); 
-                final FieldSortBuilder stringSort = SortBuilders
-                    .fieldSort( stringFieldName )
-                    .order(order)
-                    .ignoreUnmapped(true);
+                final String stringFieldName = STRING_PREFIX + sp.getPropertyName();
+                final FieldSortBuilder stringSort =
+                        SortBuilders.fieldSort( stringFieldName ).order( order ).ignoreUnmapped( true );
                 srb.addSort( stringSort );
-                log.debug("   Sort: {} order by {}", stringFieldName, order.toString());
+                log.debug( "   Sort: {} order by {}", stringFieldName, order.toString() );
 
-                final String numberFieldName = NUMBER_PREFIX + sp.getPropertyName(); 
-                final FieldSortBuilder numberSort = SortBuilders
-                    .fieldSort( numberFieldName )
-                    .order(order)
-                    .ignoreUnmapped(true);
+                final String numberFieldName = NUMBER_PREFIX + sp.getPropertyName();
+                final FieldSortBuilder numberSort =
+                        SortBuilders.fieldSort( numberFieldName ).order( order ).ignoreUnmapped( true );
                 srb.addSort( numberSort );
-                log.debug("   Sort: {} order by {}", numberFieldName, order.toString());
+                log.debug( "   Sort: {} order by {}", numberFieldName, order.toString() );
 
-                final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName(); 
-                final FieldSortBuilder booleanSort = SortBuilders
-                        .fieldSort( booleanFieldName )
-                        .order(order)
-                        .ignoreUnmapped(true);
+                final String booleanFieldName = BOOLEAN_PREFIX + sp.getPropertyName();
+                final FieldSortBuilder booleanSort =
+                        SortBuilders.fieldSort( booleanFieldName ).order( order ).ignoreUnmapped( true );
                 srb.addSort( booleanSort );
-                log.debug("   Sort: {} order by {}", booleanFieldName, order.toString());
+                log.debug( "   Sort: {} order by {}", booleanFieldName, order.toString() );
             }
 
             searchResponse = srb.execute().actionGet();
-
-        } else {
+        }
+        else {
             String scrollId = query.getCursor();
-            if ( scrollId.startsWith("\"")) {
-                scrollId = scrollId.substring(1);
+            if ( scrollId.startsWith( "\"" ) ) {
+                scrollId = scrollId.substring( 1 );
             }
-            if ( scrollId.endsWith("\"")) {
-                scrollId = scrollId.substring(0, scrollId.length() - 1 );
+            if ( scrollId.endsWith( "\"" ) ) {
+                scrollId = scrollId.substring( 0, scrollId.length() - 1 );
             }
-            log.debug("Executing query with cursor: {} ", scrollId);
+            log.debug( "Executing query with cursor: {} ", scrollId );
 
-            SearchScrollRequestBuilder ssrb = client
-                .prepareSearchScroll(scrollId)
-                .setScroll( cursorTimeout + "m" );
+            SearchScrollRequestBuilder ssrb = client.prepareSearchScroll( scrollId ).setScroll( cursorTimeout + "m" );
             searchResponse = ssrb.execute().actionGet();
         }
 
         SearchHits hits = searchResponse.getHits();
-        log.debug("   Hit count: {} Total hits: {}", hits.getHits().length, hits.getTotalHits() );
+        log.debug( "   Hit count: {} Total hits: {}", hits.getHits().length, hits.getTotalHits() );
 
         List<CandidateResult> candidates = new ArrayList<CandidateResult>();
 
-        for (SearchHit hit : hits.getHits()) {
+        for ( SearchHit hit : hits.getHits() ) {
 
             String[] idparts = hit.getId().split( DOC_ID_SEPARATOR_SPLITTER );
-            String id      = idparts[0];
-            String type    = idparts[1];
+            String id = idparts[0];
+            String type = idparts[1];
             String version = idparts[2];
 
-            Id entityId = new SimpleId(UUID.fromString(id), type);
+            Id entityId = new SimpleId( UUID.fromString( id ), type );
 
-            candidates.add( new CandidateResult( entityId, UUID.fromString(version) ));
+            candidates.add( new CandidateResult( entityId, UUID.fromString( version ) ) );
         }
 
         CandidateResults candidateResults = new CandidateResults( query, candidates );
 
         if ( candidates.size() >= query.getLimit() ) {
-            candidateResults.setCursor(searchResponse.getScrollId());
-            log.debug("   Cursor = " + searchResponse.getScrollId() );
-        } 
-
-        return candidateResults;
-    }
-
-
-    /**
-     * Convert Entity to Map. Adding prefixes for types:
-     * 
-     *    su_ - String unanalyzed field
-     *    sa_ - String analyzed field
-     *    go_ - Location field
-     *    nu_ - Number field
-     *    bu_ - Boolean field
-     */
-    private static Map entityToMap(EntityObject entity) {
-
-        Map<String, Object> entityMap = new HashMap<String, Object>();
-
-        for (Object f : entity.getFields().toArray()) {
-
-            Field field = (Field) f;
-
-            if (f instanceof ListField)  {
-                List list = (List) field.getValue();
-                entityMap.put(field.getName().toLowerCase(),
-                        new ArrayList(processCollectionForMap(list)));
-
-                if ( !list.isEmpty() ) {
-                    if ( list.get(0) instanceof String ) {
-                        Joiner joiner = Joiner.on(" ").skipNulls();
-                        String joined = joiner.join(list);
-                        entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(),
-                            new ArrayList(processCollectionForMap(list)));
-                        
-                    }
-                }
-
-            } else if (f instanceof ArrayField) {
-                List list = (List) field.getValue();
-                entityMap.put(field.getName().toLowerCase(),
-                        new ArrayList(processCollectionForMap(list)));
-
-            } else if (f instanceof SetField) {
-                Set set = (Set) field.getValue();
-                entityMap.put(field.getName().toLowerCase(),
-                        new ArrayList(processCollectionForMap(set)));
-
-            } else if (f instanceof EntityObjectField) {
-                EntityObject eo = (EntityObject)field.getValue();
-                entityMap.put(field.getName().toLowerCase(), entityToMap(eo)); // recursion
-
-            // Add type information as field-name prefixes
-
-            } else if (f instanceof StringField) {
-
-                // index in lower case because Usergrid queries are case insensitive
-                entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(), 
-                        ((String) field.getValue()).toLowerCase());
-                entityMap.put( STRING_PREFIX + field.getName().toLowerCase(),
-                        ((String) field.getValue()).toLowerCase());
-
-            } else if (f instanceof LocationField) {
-                LocationField locField = (LocationField) f;
-                Map<String, Object> locMap = new HashMap<String, Object>();
-
-                // field names lat and lon trigger ElasticSearch geo location 
-                locMap.put("lat", locField.getValue().getLatitude());
-                locMap.put("lon", locField.getValue().getLongtitude());
-                entityMap.put( GEO_PREFIX + field.getName().toLowerCase(), locMap);
-
-            } else if ( f instanceof DoubleField
-                     || f instanceof FloatField
-                     || f instanceof IntegerField
-                     || f instanceof LongField ) {
-
-                entityMap.put( NUMBER_PREFIX + field.getName().toLowerCase(), field.getValue());
-
-            } else if ( f instanceof BooleanField ) {
-
-                entityMap.put( BOOLEAN_PREFIX + field.getName().toLowerCase(), field.getValue());
-
-            } else if ( f instanceof UUIDField ) {
-
-                entityMap.put( STRING_PREFIX + field.getName().toLowerCase(),
-                    field.getValue().toString() );
-
-            } else {
-                entityMap.put(field.getName().toLowerCase(), field.getValue());
-            }
-        }
-
-        return entityMap;
-    }
-
-
-    private static Collection processCollectionForMap(Collection c) {
-        if (c.isEmpty()) {
-            return c;
+            candidateResults.setCursor( searchResponse.getScrollId() );
+            log.debug( "   Cursor = " + searchResponse.getScrollId() );
         }
-        List processed = new ArrayList();
-        Object sample = c.iterator().next();
-
-        if (sample instanceof Entity) {
-            for (Object o : c.toArray()) {
-                Entity e = (Entity) o;
-                processed.add(entityToMap(e));
-            }
 
-        } else if (sample instanceof List) {
-            for (Object o : c.toArray()) {
-                List list = (List) o;
-                processed.add(processCollectionForMap(list)); // recursion;
-            }
-
-        } else if (sample instanceof Set) {
-            for (Object o : c.toArray()) {
-                Set set = (Set) o;
-                processed.add(processCollectionForMap(set)); // recursion;
-            }
-
-        } else {
-            for (Object o : c.toArray()) {
-                processed.add(o);
-            }
-        }
-        return processed;
+        return candidateResults;
     }
 
 
     /**
-     * Build mappings for data to be indexed. Setup String fields as not_analyzed and analyzed,
-     * where the analyzed field is named {name}_ug_analyzed
+     * Build mappings for data to be indexed. Setup String fields as not_analyzed and analyzed, where the analyzed field
+     * is named {name}_ug_analyzed
      *
      * @param builder Add JSON object to this builder.
      * @param type ElasticSearch type of entity.
+     *
      * @return Content builder with JSON for mapping.
      *
      * @throws java.io.IOException On JSON generation error.
      */
-    public static XContentBuilder createDoubleStringIndexMapping(
-            XContentBuilder builder, String type) throws IOException {
-
-        builder = builder
-            .startObject()
-                .startObject(type)
-                    .startArray("dynamic_templates")
-
-                        // any string with field name that starts with sa_ gets analyzed
-                        .startObject()
-                            .startObject("template_1")
-                                .field("match", ANALYZED_STRING_PREFIX + "*")
-                                .field("match_mapping_type", "string")
-                                .startObject("mapping")
-                                    .field("type", "string")
-                                    .field("index", "analyzed")
-                                .endObject()
-                            .endObject()
-                        .endObject()
+    public static XContentBuilder createDoubleStringIndexMapping( XContentBuilder builder, String type )
+            throws IOException {
+
+        builder = builder.startObject().startObject( type ).startArray( "dynamic_templates" )
+
+                // any string with field name that starts with sa_ gets analyzed
+                .startObject().startObject( "template_1" ).field( "match", ANALYZED_STRING_PREFIX + "*" )
+                .field( "match_mapping_type", "string" ).startObject( "mapping" ).field( "type", "string" )
+                .field( "index", "analyzed" ).endObject().endObject().endObject()
 
                         // all other strings are not analyzed
-                        .startObject()
-                            .startObject("template_2")
-                                .field("match", "*")
-                                .field("match_mapping_type", "string")
-                                .startObject("mapping")
-                                    .field("type", "string")
-                                    .field("index", "not_analyzed")
-                                .endObject()
-                            .endObject()
-                        .endObject()
-                
+                .startObject().startObject( "template_2" ).field( "match", "*" ).field( "match_mapping_type", "string" )
+                .startObject( "mapping" ).field( "type", "string" ).field( "index", "not_analyzed" ).endObject()
+                .endObject().endObject()
+
                         // fields names starting with go_ get geo-indexed
-                        .startObject()
-                            .startObject("template_3")
-                                .field("match", GEO_PREFIX + "location")
-                                .startObject("mapping")
-                                    .field("type", "geo_point")
-                                .endObject()
-                            .endObject()
-                        .endObject()
-
-                    .endArray()
-                .endObject()
-            .endObject();
+                .startObject().startObject( "template_3" ).field( "match", GEO_PREFIX + "location" )
+                .startObject( "mapping" ).field( "type", "geo_point" ).endObject().endObject().endObject()
+
+                .endArray().endObject().endObject();
 
         return builder;
     }
 
 
-    public void refresh() {
-        client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
-        log.debug("Refreshed index: " + indexName);
-    }
-
     @Override
-    public CandidateResults getEntityVersions(Id id) {
+    public CandidateResults getEntityVersions( final IndexScope scope, final Id id ) {
         Query query = new Query();
-        query.addEqualityFilter(ENTITYID_FIELDNAME,id.getUuid().toString());
-        CandidateResults results = search( query );
+        query.addEqualityFilter( ENTITYID_FIELDNAME, id.getUuid().toString() );
+        CandidateResults results = search( scope, query );
         return results;
     }
 
+
+    @Override
+    public void refresh() {
+        client.admin().indices().prepareRefresh( indexName ).execute().actionGet();
+        log.debug( "Refreshed index: " + indexName );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
index 5634183..0afe992 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsQueryVistor.java
@@ -18,19 +18,24 @@
 
 package org.apache.usergrid.persistence.index.impl;
 
-import com.google.common.base.Joiner;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 import java.util.UUID;
+
+import org.elasticsearch.common.unit.DistanceUnit;
+import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.exceptions.NoFullTextIndexException;
 import org.apache.usergrid.persistence.index.exceptions.NoIndexException;
-import org.apache.usergrid.persistence.index.exceptions.IndexException;
-import static org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl.ANALYZED_STRING_PREFIX;
-import static org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl.BOOLEAN_PREFIX;
-import static org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl.GEO_PREFIX;
-import static org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl.NUMBER_PREFIX;
-import static org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl.STRING_PREFIX;
 import org.apache.usergrid.persistence.index.query.tree.AndOperand;
 import org.apache.usergrid.persistence.index.query.tree.ContainsOperand;
 import org.apache.usergrid.persistence.index.query.tree.Equal;
@@ -42,14 +47,14 @@ import org.apache.usergrid.persistence.index.query.tree.NotOperand;
 import org.apache.usergrid.persistence.index.query.tree.OrOperand;
 import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
 import org.apache.usergrid.persistence.index.query.tree.WithinOperand;
-import org.elasticsearch.common.unit.DistanceUnit;
-import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.FilterBuilder;
-import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.GEO_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PREFIX;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
new file mode 100644
index 0000000..d607700
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -0,0 +1,112 @@
+package org.apache.usergrid.persistence.index.impl;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class IndexingUtils {
+
+
+    public static final String STRING_PREFIX = "su_";
+    public static final String ANALYZED_STRING_PREFIX = "sa_";
+    public static final String GEO_PREFIX = "go_";
+    public static final String NUMBER_PREFIX = "nu_";
+    public static final String BOOLEAN_PREFIX = "bu_";
+
+    public static final String ENTITYID_FIELDNAME = "zzz_entityid_zzz";
+
+    public static final String DOC_ID_SEPARATOR = "|";
+    public static final String DOC_ID_SEPARATOR_SPLITTER = "\\|";
+
+    // These are not allowed in document type names: _ . , | #
+    public static final String DOC_TYPE_SEPARATOR = "^";
+    public static final String DOC_TYPE_SEPARATOR_SPLITTER = "\\^";
+
+    public static final String INDEX_NAME_SEPARATOR = "^";
+
+
+    /**
+      * Create our sub scope.  This is the ownerUUID + type
+      * @param scope
+      * @return
+      */
+     public static String createCollectionScopeTypeName( IndexScope scope ) {
+         StringBuilder sb = new StringBuilder();
+         String sep = DOC_TYPE_SEPARATOR;
+         sb.append( scope.getApplication().getUuid() ).append(sep);
+         sb.append( scope.getApplication().getType() ).append(sep);
+         sb.append( scope.getOwner().getUuid() ).append(sep);
+         sb.append( scope.getOwner().getType() ).append(sep);
+         sb.append( scope.getName() );
+         return sb.toString();
+     }
+
+
+
+    /**
+     * Create the index name based on our prefix+appUUID+AppType
+     * @param prefix
+     * @param applicationScope
+     * @return
+     */
+    public static String createIndexName(
+            String prefix, ApplicationScope applicationScope) {
+        StringBuilder sb = new StringBuilder();
+        String sep = INDEX_NAME_SEPARATOR;
+        sb.append( prefix ).append(sep);
+        sb.append( applicationScope.getApplication().getUuid() ).append(sep);
+        sb.append( applicationScope.getApplication().getType() );
+        return sb.toString();
+    }
+
+
+
+    /**
+     * Create the index doc from the given entity
+     * @param entity
+     * @return
+     */
+    public static String createIndexDocId(Entity entity) {
+        return createIndexDocId(entity.getId(), entity.getVersion());
+    }
+
+
+    /**
+     * Create the doc Id. This is the entitie's type + uuid + version
+     * @param entityId
+     * @param version
+     * @return
+     */
+    public static String createIndexDocId(Id entityId, UUID version) {
+        StringBuilder sb = new StringBuilder();
+        String sep = DOC_ID_SEPARATOR;
+        sb.append( entityId.getUuid() ).append(sep);
+        sb.append( entityId.getType() ).append(sep);
+        sb.append( version.toString() );
+        return sb.toString();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
index 9419809..c4f8f44 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory
 import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
 import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
@@ -167,12 +168,12 @@ public class CorePerformanceIT extends BaseIT {
             Query query = Query.fromQL( "review_score > 0"); // get all reviews;
             query.withLimit( maxEntities < 1000 ? maxEntities : 1000 );
 
-            CandidateResults candidateResults = eci.search( query );
+            CandidateResults candidateResults = eci.search(indexScope,  query );
             int count = candidateResults.size();
 
             while ( candidateResults.hasCursor() && count < maxEntities ) {
                 query.setCursor( candidateResults.getCursor() )   ;
-                candidateResults = eci.search( query );
+                candidateResults = eci.search(indexScope,  query );
                 count += candidateResults.size();
 
                 //cause retrieval from cassandra
@@ -221,6 +222,9 @@ public class CorePerformanceIT extends BaseIT {
 //            Id appId = orgAppScope.scope.getOwner();
 
             int count = 0;
+
+            EntityIndexBatch entityIndexBatch = eci.createBatch();
+
             try {
                 while ( (s = br.readLine()) != null && count < maxEntities ) {
                     
@@ -230,7 +234,8 @@ public class CorePerformanceIT extends BaseIT {
                             
                             // write and index current entity
                             ecm.write( current ).toBlocking().last();
-                            eci.index( current );
+
+                            entityIndexBatch.index(indexScope, current  );
                             
                             if ( maxEntities < 20 ) {
                                 log.info("Index written for {}", current.getId());
@@ -242,6 +247,10 @@ public class CorePerformanceIT extends BaseIT {
                                     new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
                             
                             count++;
+                            if(count % 1000 == 0){
+                                entityIndexBatch.execute();
+                            }
+
                             if (count % 100000 == 0) {
                                 log.info("Indexed {} reviews in {} / {} ", 
                                     new Object[] { 
@@ -295,23 +304,23 @@ public class CorePerformanceIT extends BaseIT {
 
             // TODO: come up with more and more complex queries for CorePerformanceIT
 
-            query(eci, "product_productid = 'B006K2ZZ7K'") ;
-            query(eci, "review_profilename = 'Twoapennything'") ;
-            query(eci, "review_profilename contains 'Natalia'") ;
-            query(eci, "review_profilename contains 'Patrick'") ;
-            query(eci, "review_time = 1342051200") ;
-            query(eci, "review_time > 1342051200") ;
-            query(eci, "review_score > 0");
-            query(eci, "review_score > 2");
-            query(eci, "review_score > 3");
-            query(eci, "review_score > 4");
-            query(eci, "review_score > 5");
+            query(indexScope, eci, "product_productid = 'B006K2ZZ7K'") ;
+            query(indexScope, eci, "review_profilename = 'Twoapennything'") ;
+            query(indexScope, eci, "review_profilename contains 'Natalia'") ;
+            query(indexScope, eci, "review_profilename contains 'Patrick'") ;
+            query(indexScope, eci, "review_time = 1342051200") ;
+            query(indexScope, eci, "review_time > 1342051200") ;
+            query(indexScope, eci, "review_score > 0");
+            query(indexScope, eci, "review_score > 2");
+            query(indexScope, eci, "review_score > 3");
+            query(indexScope, eci, "review_score > 4");
+            query(indexScope, eci, "review_score > 5");
         }
     }
 
-    public static void query( EntityIndex eci, String query ) {;
+    public static void query(final IndexScope indexScope, final EntityIndex eci, final String query ) {;
         Query q = Query.fromQL(query) ;
-        CandidateResults candidateResults = eci.search( q );
+        CandidateResults candidateResults = eci.search(indexScope,  q );
         log.info("size = {} returned from query {}", candidateResults.size(), q.getQl() );
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index f8df2a2..5fb02f2 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
@@ -105,12 +106,14 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         IndexScope scope = new IndexScopeImpl( appId, person.getId(), "likes" );
 
         EntityIndex personLikesIndex = ecif.createEntityIndex( scope );
-        personLikesIndex.index( muffin );
 
-        personLikesIndex.refresh();
+        EntityIndexBatch batch = personLikesIndex.createBatch();
+
+        batch.index( scope, muffin );
+        batch.executeAndRefresh();
 
         // now, let's search for things that Dave likes
-        CandidateResults likes = personLikesIndex.search( Query.fromQL( "select *" ) );
+        CandidateResults likes = personLikesIndex.search(scope,  Query.fromQL( "select *" ) );
         assertEquals( 1, likes.size() );
         assertEquals(muffin.getId(), likes.get(0).getId());
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fba71c2d/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index d4b8d5b..e01c45d 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.guice.TestIndexModule;
@@ -104,6 +105,9 @@ public class EntityIndexTest extends BaseIT {
         int count = 0;
         StopWatch timer = new StopWatch();
         timer.start();
+
+        final EntityIndexBatch batch = entityIndex.createBatch();
+
         for ( Object o : sampleJson ) {
 
             Map<String, Object> item = ( Map<String, Object> ) o;
@@ -112,12 +116,20 @@ public class EntityIndexTest extends BaseIT {
             entity = EntityIndexMapUtils.fromMap( entity, item );
             EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
 
-            entityIndex.index( entity );
+            batch.index( indexScope, entity );
+
+            if(count %1000 == 0){
+                batch.execute();
+            }
+
+
 
             if ( count++ > MAX_ENTITIES ) {
                 break;
             }
         }
+
+        batch.execute();
         timer.stop();
         log.info( "Total time to index {} entries {}ms, average {}ms/entry",
                 new Object[] { count, timer.getTime(), timer.getTime() / count } );
@@ -125,7 +137,7 @@ public class EntityIndexTest extends BaseIT {
         entityIndex.refresh();
 
 
-        testQueries( entityIndex );
+        testQueries( indexScope, entityIndex );
     }
 
 
@@ -147,29 +159,27 @@ public class EntityIndexTest extends BaseIT {
         Entity entity = EntityIndexMapUtils.fromMap( entityMap );
         EntityUtils.setId( entity, new SimpleId( "fastcar" ) );
         EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
-        entityIndex.index( entity );
+        entityIndex.createBatch().index(indexScope , entity ).executeAndRefresh();
 
-        entityIndex.refresh();
-
-        CandidateResults candidateResults = entityIndex.search( Query.fromQL( "name contains 'Ferrari*'" ) );
+        CandidateResults candidateResults = entityIndex.search(indexScope,  Query.fromQL( "name contains 'Ferrari*'" ) );
         assertEquals( 1, candidateResults.size() );
 
-        entityIndex.deindex( entity );
+        entityIndex.createBatch().deindex( indexScope, entity ).execute();
 
         entityIndex.refresh();
 
-        candidateResults = entityIndex.search( Query.fromQL( "name contains 'Ferrari*'" ) );
+        candidateResults = entityIndex.search( indexScope, Query.fromQL( "name contains 'Ferrari*'" ) );
         assertEquals( 0, candidateResults.size() );
     }
 
 
-    private void testQuery( EntityIndex entityIndex, String queryString, int num ) {
+    private void testQuery(final IndexScope scope, final EntityIndex entityIndex, final String queryString, final int num ) {
 
         StopWatch timer = new StopWatch();
         timer.start();
         Query query = Query.fromQL( queryString );
         query.setLimit( 1000 );
-        CandidateResults candidateResults = entityIndex.search( query );
+        CandidateResults candidateResults = entityIndex.search( scope, query );
         timer.stop();
 
         assertEquals( num, candidateResults.size() );
@@ -177,34 +187,34 @@ public class EntityIndexTest extends BaseIT {
     }
 
 
-    private void testQueries( EntityIndex entityIndex ) {
+    private void testQueries(final IndexScope scope, final EntityIndex entityIndex ) {
 
 
-        testQuery( entityIndex, "name = 'Morgan Pierce'", 1 );
+        testQuery(scope,  entityIndex, "name = 'Morgan Pierce'", 1 );
 
-        testQuery( entityIndex, "name = 'morgan pierce'", 1 );
+        testQuery(scope,  entityIndex, "name = 'morgan pierce'", 1 );
 
-        testQuery( entityIndex, "name = 'Morgan'", 0 );
+        testQuery(scope,  entityIndex, "name = 'Morgan'", 0 );
 
-        testQuery( entityIndex, "name contains 'Morgan'", 1 );
+        testQuery(scope,  entityIndex, "name contains 'Morgan'", 1 );
 
-        testQuery( entityIndex, "company > 'GeoLogix'", 64 );
+        testQuery(scope,  entityIndex, "company > 'GeoLogix'", 64 );
 
-        testQuery( entityIndex, "gender = 'female'", 45 );
+        testQuery(scope,  entityIndex, "gender = 'female'", 45 );
 
-        testQuery( entityIndex, "name = 'Minerva Harrell' and age > 39", 1 );
+        testQuery(scope,  entityIndex, "name = 'Minerva Harrell' and age > 39", 1 );
 
-        testQuery( entityIndex, "name = 'Minerva Harrell' and age > 39 and age < 41", 1 );
+        testQuery(scope,  entityIndex, "name = 'Minerva Harrell' and age > 39 and age < 41", 1 );
 
-        testQuery( entityIndex, "name = 'Minerva Harrell' and age > 40", 0 );
+        testQuery(scope,  entityIndex, "name = 'Minerva Harrell' and age > 40", 0 );
 
-        testQuery( entityIndex, "name = 'Minerva Harrell' and age >= 40", 1 );
+        testQuery(scope,  entityIndex, "name = 'Minerva Harrell' and age >= 40", 1 );
 
-        testQuery( entityIndex, "name = 'Minerva Harrell' and age <= 40", 1 );
+        testQuery(scope,  entityIndex, "name = 'Minerva Harrell' and age <= 40", 1 );
         
-        testQuery( entityIndex, "name = 'Morgan* '", 1 );
+        testQuery(scope,  entityIndex, "name = 'Morgan* '", 1 );
         
-        testQuery( entityIndex, "name = 'Morgan*'", 1 );
+        testQuery(scope,  entityIndex, "name = 'Morgan*'", 1 );
 
         
         // test a couple of array sub-property queries
@@ -212,16 +222,16 @@ public class EntityIndexTest extends BaseIT {
         int totalUsers = 102;
 
         // nobody has a friend named Jack the Ripper
-        testQuery( entityIndex, "friends.name = 'Jack the Ripper'", 0 );
+        testQuery(scope,  entityIndex, "friends.name = 'Jack the Ripper'", 0 );
 
         // everybody doesn't have a friend named Jack the Ripper
-        testQuery( entityIndex, "not (friends.name = 'Jack the Ripper')", totalUsers );
+        testQuery(scope,  entityIndex, "not (friends.name = 'Jack the Ripper')", totalUsers );
 
         // one person has a friend named Shari Hahn
-        testQuery( entityIndex, "friends.name = 'Wendy Moody'", 1 );
+        testQuery(scope,  entityIndex, "friends.name = 'Wendy Moody'", 1 );
 
         // everybody but 1 doesn't have a friend named Shari Hahh
-        testQuery( entityIndex, "not (friends.name = 'Shari Hahn')", totalUsers - 1);
+        testQuery(scope,  entityIndex, "not (friends.name = 'Shari Hahn')", totalUsers - 1);
 
     }
 
@@ -282,17 +292,20 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setId( user, new SimpleId( "edanuff" ) );
         EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() );
 
-        entityIndex.index( user );
+
+        final EntityIndexBatch batch = entityIndex.createBatch();
+
+        batch.index( indexScope, user );
 
         user.setField( new StringField( "address1", "1782 address st" ) );
-        entityIndex.index( user );
+        batch.index( indexScope, user );
         user.setField( new StringField( "address2", "apt 508" ) );
-        entityIndex.index( user );
+        batch.index( indexScope,  user );
         user.setField( new StringField( "address3", "apt 508" ) );
-        entityIndex.index( user );
-        entityIndex.refresh();
+        batch.index( indexScope,  user );
+        batch.executeAndRefresh();
 
-        CandidateResults results = entityIndex.getEntityVersions( user.getId() );
+        CandidateResults results = entityIndex.getEntityVersions(indexScope,  user.getId() );
 
         assertEquals(1,  results.size());
         assertEquals( results.get( 0 ).getId(), user.getId() );
@@ -323,21 +336,21 @@ public class EntityIndexTest extends BaseIT {
         EntityUtils.setVersion( user, UUIDGenerator.newTimeUUID() );
 
 
-        ei.index( user );
-        ei.refresh();
+        EntityIndexBatch batch = ei.createBatch();
+
+        batch.index( appScope, user ).executeAndRefresh();
         Query query = new Query();
         query.addEqualityFilter( "username", "edanuff" );
-        CandidateResults r = ei.search( query );
+        CandidateResults r = ei.search( appScope, query );
         assertEquals( user.getId(), r.get( 0 ).getId() );
 
-        ei.deindex( user.getId(), user.getVersion() );
-        ei.refresh();
+        batch.deindex(appScope, user.getId(), user.getVersion() ).executeAndRefresh();
 
 
         // EntityRef
         query = new Query();
         query.addEqualityFilter( "username", "edanuff" );
-        r = ei.search( query );
+        r = ei.search(appScope, query );
 
         assertFalse( r.iterator().hasNext() );
     }
@@ -362,7 +375,10 @@ public class EntityIndexTest extends BaseIT {
         Entity bill = EntityIndexMapUtils.fromMap( billMap );
         EntityUtils.setId( bill, new SimpleId( UUIDGenerator.newTimeUUID(), "user"  ) );
         EntityUtils.setVersion( bill, UUIDGenerator.newTimeUUID() );
-        ei.index( bill );
+
+        EntityIndexBatch batch = ei.createBatch();
+
+        batch.index( appScope,  bill );
 
         // Fred has age as int, favorites as object and retirement goal as object
         Map fredMap = new HashMap() {{
@@ -382,28 +398,28 @@ public class EntityIndexTest extends BaseIT {
         Entity fred = EntityIndexMapUtils.fromMap( fredMap );
         EntityUtils.setId( fred, new SimpleId( UUIDGenerator.newTimeUUID(), "user"  ) );
         EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
-        ei.index( fred );
+        batch.index( appScope, fred );
 
-        ei.refresh();
+        batch.executeAndRefresh();
 
         Query query = new Query();
         query.addEqualityFilter( "username", "bill" );
-        CandidateResults r = ei.search( query );
+        CandidateResults r = ei.search( appScope, query );
         assertEquals( bill.getId(), r.get( 0 ).getId() );
 
         query = new Query();
         query.addEqualityFilter( "username", "fred" );
-        r = ei.search( query );
+        r = ei.search( appScope,  query );
         assertEquals( fred.getId(), r.get( 0 ).getId() );
 
         query = new Query();
         query.addEqualityFilter( "age", 41 );
-        r = ei.search( query );
+        r = ei.search( appScope,  query );
         assertEquals( fred.getId(), r.get( 0 ).getId() );
 
         query = new Query();
         query.addEqualityFilter( "age", "thirtysomething" );
-        r = ei.search( query );
+        r = ei.search(  appScope, query );
         assertEquals( bill.getId(), r.get( 0 ).getId() );
     }
 }


[2/2] git commit: First pass of changing cp and rm manager to use batches

Posted by to...@apache.org.
First pass of changing cp and rm manager to use batches


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bba08ddc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bba08ddc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bba08ddc

Branch: refs/heads/esbatching
Commit: bba08ddc1ed27b719ecaf46c63eb3f20d4bef8f5
Parents: fba71c2
Author: Todd Nine <to...@apache.org>
Authored: Wed Oct 1 22:47:06 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Oct 1 22:47:06 2014 -0600

----------------------------------------------------------------------
 .../CpEntityIndexDeleteListener.java            |  8 +-
 .../corepersistence/CpEntityManager.java        | 40 ++++++---
 .../corepersistence/CpEntityManagerFactory.java | 33 ++++---
 .../corepersistence/CpManagerCache.java         | 12 +--
 .../corepersistence/CpRelationManager.java      | 93 +++++++++++++-------
 .../CpEntityIndexDeleteListenerTest.java        | 12 ++-
 .../core/scope/ApplicationScopeImpl.java        |  6 +-
 .../serialization/EdgeSerializationTest.java    |  3 +
 .../index/impl/EsEntityIndexBatchImpl.java      | 42 +++++++--
 .../index/impl/EsEntityIndexImpl.java           |  2 +-
 .../index/utils/IndexValidationUtils.java       | 10 +++
 11 files changed, 177 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
index f8b28ac..cf3f4a6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListener.java
@@ -59,13 +59,13 @@ public class CpEntityIndexDeleteListener {
 
 
     public Observable<EntityVersion> receive(final MvccEntityDeleteEvent event) {
-        CollectionScope collectionScope = event.getCollectionScope();
-        IndexScope indexScope = new IndexScopeImpl(collectionScope.getApplication(), collectionScope.getOwner(), collectionScope.getName());
+        final CollectionScope collectionScope = event.getCollectionScope();
+        final IndexScope indexScope = new IndexScopeImpl(collectionScope.getApplication(), collectionScope.getOwner(), collectionScope.getName());
         final EntityIndex entityIndex = entityIndexFactory.createEntityIndex(indexScope);
         return Observable.create(new ObservableIterator<CandidateResult>("deleteEsIndexVersions") {
             @Override
             protected Iterator<CandidateResult> getIterator() {
-                CandidateResults results = entityIndex.getEntityVersions(event.getEntity().getId());
+                CandidateResults results = entityIndex.getEntityVersions(indexScope, event.getEntity().getId());
                 return results.iterator();
             }
         }).subscribeOn(Schedulers.io())
@@ -78,7 +78,7 @@ public class CpEntityIndexDeleteListener {
                             //filter find entities <= current version
                             if (entity.getVersion().timestamp() <= event.getVersion().timestamp()) {
                                 versions.add(entity);
-                                entityIndex.deindex(entity.getId(), entity.getVersion());
+                                entityIndex.createBatch().deindex(indexScope, entity.getId(), entity.getVersion());
                             }
                         }
                         return Observable.from(versions);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index f85ba30..a5475e7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -74,6 +74,7 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
 import org.apache.usergrid.persistence.index.query.CounterResolution;
@@ -561,6 +562,12 @@ public class CpEntityManager implements EntityManager {
             logger.debug( "Deleting indexes of all {} collections owning the entity", 
                     owners.keySet().size() );
 
+            final  EntityIndex ei = managerCache.getEntityIndex( appScope );
+
+            final EntityIndexBatch batch = ei.createBatch();
+
+
+
             for ( String ownerType : owners.keySet() ) {
                 Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType );
 
@@ -573,27 +580,30 @@ public class CpEntityManager implements EntityManager {
                                 new SimpleId( uuid, ownerType ), 
                                 CpEntityManager.getCollectionScopeNameFromCollectionName(coll) );
 
-                        EntityIndex ei = managerCache.getEntityIndex( indexScope );
 
-                        ei.deindex( entity );
+                        batch.index( indexScope, entity );
                     }
                 }
             }
 
+
+
             // deindex from default index scope
             IndexScope defaultIndexScope = new IndexScopeImpl( 
                     appScope.getApplication(), 
                     appScope.getApplication(),
                     getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-            EntityIndex entityIndex = managerCache.getEntityIndex( defaultIndexScope );
-            entityIndex.deindex( entity );
+
+            batch.deindex(defaultIndexScope,  entity );
 
             IndexScope allTypesIndexScope = new IndexScopeImpl( 
                 appScope.getApplication(), 
                 appScope.getApplication(), 
                 ALL_TYPES);
-            EntityIndex aei = managerCache.getEntityIndex( allTypesIndexScope );
-            aei.deindex( entity );
+
+            batch.deindex( allTypesIndexScope,  entity );
+
+            batch.execute();
 
             decrementEntityCollection( Schema.defaultCollectionName( entityId.getType() ) );
 
@@ -980,7 +990,7 @@ public class CpEntityManager implements EntityManager {
                 getCollectionScopeNameFromEntityType( entityRef.getType()) );
 
         EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
-        EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope );
+        EntityIndex ei = managerCache.getEntityIndex( appScope );
 
         Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
 
@@ -1002,7 +1012,8 @@ public class CpEntityManager implements EntityManager {
         logger.debug("Wrote {}:{} version {}", new Object[] { 
             cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
 
-        ei.index( cpEntity );
+
+        ei.createBatch().index(defaultIndexScope, cpEntity ).execute();
 
         // update in all containing collections and connection indexes
         CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
@@ -2530,12 +2541,13 @@ public class CpEntityManager implements EntityManager {
         }
 
         // Index CP entity into default collection scope
-        IndexScope defaultIndexScope = new IndexScopeImpl( 
-            appScope.getApplication(), 
-            appScope.getApplication(), 
-            CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
-        EntityIndex ei = managerCache.getEntityIndex( defaultIndexScope );
-        ei.index( cpEntity );
+//        IndexScope defaultIndexScope = new IndexScopeImpl(
+//            appScope.getApplication(),
+//            appScope.getApplication(),
+//            CpEntityManager.getCollectionScopeNameFromEntityType( entity.getType() ) );
+//        EntityIndex ei = managerCache.getEntityIndex( appScope );
+//
+//        ei.createBatch().index( defaultIndexScope,  cpEntity ).execute();
 
         // reflect changes in the legacy Entity
         entity.setUuid( cpEntity.getId().getUuid() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 3e9a6a9..090a4d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -50,6 +50,7 @@ import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsExcept
 import org.apache.usergrid.persistence.exceptions.DuplicateUniquePropertyExistsException;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -263,8 +264,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                     .getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
 
             orgInfoEntity = ecm.write( orgInfoEntity ).toBlockingObservable().last();
-            eci.index( orgInfoEntity );
-            eci.refresh();
+            eci.createBatch().index(SYSTEM_ORGS_INDEX_SCOPE,  orgInfoEntity ).executeAndRefresh();
         }
 
         if ( properties == null ) {
@@ -288,7 +288,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                     .getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
 
             appInfoEntity = ecm.write( appInfoEntity ).toBlockingObservable().last();
-            eci.index( appInfoEntity );
+            eci.createBatch().index(SYSTEM_APPS_INDEX_SCOPE,  appInfoEntity ).executeAndRefresh();
             eci.refresh();
         }
 
@@ -325,7 +325,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         Query q = Query.fromQL(PROPERTY_NAME + " = '" + name + "'");
 
         EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_ORGS_INDEX_SCOPE );
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_ORGS_INDEX_SCOPE,  q );
 
         if ( results.isEmpty() ) {
             return null; 
@@ -342,7 +342,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_APPS_INDEX_SCOPE );
         
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_APPS_INDEX_SCOPE,  q );
 
         if ( results.isEmpty() ) {
             return null; 
@@ -371,7 +371,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             Query q = Query.fromQL("select *");
             q.setCursor( cursor );
 
-            CandidateResults results = ei.search( q );
+            CandidateResults results = ei.search(SYSTEM_APPS_INDEX_SCOPE,  q );
             cursor = results.getCursor();
 
             Iterator<CandidateResult> iter = results.iterator();
@@ -417,7 +417,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         Query q = Query.fromQL("select *");
 
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE,  q );
 
         if ( results.isEmpty() ) {
             return new HashMap<String,String>();
@@ -447,7 +447,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
             .getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
 
         Query q = Query.fromQL("select *");
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE,  q );
         Entity propsEntity;
         if ( !results.isEmpty() ) {
             propsEntity = em.load( results.iterator().next().getId()).toBlockingObservable().last();
@@ -464,7 +464,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
 
         propsEntity = em.write( propsEntity ).toBlockingObservable().last();
-        ei.index( propsEntity );    
+        ei.createBatch().index( SYSTEM_PROPS_INDEX_SCOPE, propsEntity );
 
         return true;
     }
@@ -485,7 +485,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityIndex ei = getManagerCache().getEntityIndex( SYSTEM_PROPS_INDEX_SCOPE );
 
         Query q = Query.fromQL("select *");
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(SYSTEM_PROPS_INDEX_SCOPE,  q );
 
         Entity propsEntity = em.load( 
                 results.iterator().next().getId() ).toBlockingObservable().last();
@@ -501,7 +501,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         propsEntity.removeField( name );
 
         propsEntity = em.write( propsEntity ).toBlockingObservable().last();
-        ei.index( propsEntity );    
+        ei.createBatch().index( SYSTEM_PROPS_INDEX_SCOPE, propsEntity );
 
         return true;
     }
@@ -616,9 +616,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         EntityIndex ei = managerCache.getEntityIndex( is );
 
         Query q = Query.fromQL("select *");
-        CandidateResults results = ei.search( q );
+        CandidateResults results = ei.search(is,  q );
 
-        Map<String, UUID> appMap = new HashMap<String, UUID>();
+        int count = 0;
+        final EntityIndexBatch batch = ei.createBatch();
 
         Iterator<CandidateResult> iter = results.iterator();
         while (iter.hasNext()) {
@@ -643,7 +644,11 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 }
             );
 
-            ei.index(entity);
+            batch.index(is, entity);
+
+
+
+            count++;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index 2f8bd3f..0e7c084 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -36,8 +36,8 @@ class CpManagerCache {
     private final LRUCache2<CollectionScope, EntityCollectionManager> ecmCache
             = new LRUCache2<CollectionScope, EntityCollectionManager>(50, 1 * 60 * 60 * 1000);
 
-    private final LRUCache2<IndexScope, EntityIndex> eiCache
-            = new LRUCache2<IndexScope, EntityIndex>(50, 1 * 60 * 60 * 1000);
+    private final LRUCache2<ApplicationScope, EntityIndex> eiCache
+            = new LRUCache2<>(50, 1 * 60 * 60 * 1000);
 
     private final LRUCache2<ApplicationScope, GraphManager> gmCache
             = new LRUCache2<ApplicationScope, GraphManager>(50, 1 * 60 * 60 * 1000);
@@ -61,13 +61,13 @@ class CpManagerCache {
         return ecm;
     }
 
-    public EntityIndex getEntityIndex(IndexScope indexScope) {
+    public EntityIndex getEntityIndex(ApplicationScope applicationScope) {
 
-        EntityIndex ei = eiCache.get(indexScope);
+        EntityIndex ei = eiCache.get(applicationScope);
 
         if (ei == null) {
-            ei = eif.createEntityIndex(indexScope);
-            eiCache.put(indexScope, ei);
+            ei = eif.createEntityIndex(applicationScope);
+            eiCache.put(applicationScope, ei);
         }
         return ei;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3486fbc..260e429 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -425,6 +426,11 @@ public class CpRelationManager implements RelationManager {
 
         // loop through all types of edge to target
         int count = 0;
+
+        final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
+
+        final EntityIndexBatch entityIndexBatch = ei.createBatch();
+
         while ( edgeTypesToTarget.hasNext() ) {
 
             // get all edges of the type
@@ -460,10 +466,9 @@ public class CpRelationManager implements RelationManager {
                         applicationScope.getApplication(),
                         new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
                         CpEntityManager.getConnectionScopeName( cpHeadEntity.getId().getType(), connName ));
-                } 
-           
-                EntityIndex ei = managerCache.getEntityIndex(indexScope);
-                ei.index(cpEntity);
+                }
+
+                entityIndexBatch.index(indexScope, cpEntity);
 
                 // reindex the entity in the source entity's all-types index
                 
@@ -471,8 +476,8 @@ public class CpRelationManager implements RelationManager {
                     applicationScope.getApplication(),
                     new SimpleId(sourceEntity.getUuid(), sourceEntity.getType()),
                     ALL_TYPES);
-                ei = managerCache.getEntityIndex(indexScope);
-                ei.index(cpEntity);
+
+                entityIndexBatch.index(indexScope, cpEntity);
 
                 count++;
             }
@@ -659,6 +664,7 @@ public class CpRelationManager implements RelationManager {
             applicationScope.getApplication(), 
             applicationScope.getApplication(), 
             CpEntityManager.getCollectionScopeNameFromEntityType( itemRef.getType()));
+
         EntityCollectionManager memberMgr = managerCache.getEntityCollectionManager(memberScope);
 
         org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
@@ -700,29 +706,36 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager(applicationScope);
         gm.writeEdge(edge).toBlockingObservable().last();
 
+        final EntityIndex index = managerCache.getEntityIndex( applicationScope );
+
+        final EntityIndexBatch batch = index.createBatch();
+
+
         // index member into entity collection | type scope
         IndexScope collectionIndexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
-        EntityIndex collectionIndex = managerCache.getEntityIndex(collectionIndexScope);
-        collectionIndex.index( memberEntity );
+
+        batch.index(collectionIndexScope, memberEntity );
 
         // index member into entity | all-types scope
         IndexScope entityAllTypesScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             ALL_TYPES);
-        EntityIndex entityAllCollectionIndex = managerCache.getEntityIndex(entityAllTypesScope);
-        entityAllCollectionIndex.index( memberEntity );
+
+        batch.index(entityAllTypesScope, memberEntity );
 
         // index member into application | all-types scope
         IndexScope appAllTypesScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             applicationScope.getApplication(), 
             ALL_TYPES);
-        EntityIndex allCollectionIndex = managerCache.getEntityIndex(appAllTypesScope);
-        allCollectionIndex.index( memberEntity );
+
+        batch.index( appAllTypesScope,  memberEntity );
+
+        batch.execute();
 
         logger.debug("Added entity {}:{} to collection {}", new String[] { 
             itemRef.getUuid().toString(), itemRef.getType(), collName }); 
@@ -844,13 +857,16 @@ public class CpRelationManager implements RelationManager {
         org.apache.usergrid.persistence.model.entity.Entity memberEntity = memberMgr.load(
             new SimpleId( itemRef.getUuid(), itemRef.getType() )).toBlockingObservable().last();
 
+        final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+        final EntityIndexBatch batch = ei.createBatch();
+
         // remove item from collection index
         IndexScope indexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
-        ei.deindex( memberEntity );
+
+        batch.deindex(indexScope,  memberEntity );
 
         // remove collection from item index 
         IndexScope itemScope = new IndexScopeImpl(
@@ -858,8 +874,11 @@ public class CpRelationManager implements RelationManager {
             memberEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( 
                 Schema.defaultCollectionName( cpHeadEntity.getId().getType() )));
-        ei = managerCache.getEntityIndex(itemScope);
-        ei.deindex( cpHeadEntity );
+
+
+        batch.deindex(itemScope,  cpHeadEntity );
+
+        batch.execute();
 
         // remove edge from collection to item 
         GraphManager gm = managerCache.getGraphManager(applicationScope);
@@ -958,7 +977,8 @@ public class CpRelationManager implements RelationManager {
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getCollectionScopeNameFromCollectionName( collName ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
+
+        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
       
         logger.debug("Searching scope {}:{}:{}",
             new String[] { 
@@ -969,7 +989,7 @@ public class CpRelationManager implements RelationManager {
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
 
-        CandidateResults crs = ei.search( query );
+        CandidateResults crs = ei.search(indexScope,  query );
 
         return buildResults( query, crs, collName );
 
@@ -1088,21 +1108,25 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager(applicationScope);
         gm.writeEdge(edge).toBlockingObservable().last();
 
+        final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+        final EntityIndexBatch batch = ei.createBatch();
+
         // Index the new connection in app|source|type context
         IndexScope indexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             CpEntityManager.getConnectionScopeName( connectedEntityRef.getType(), connectionType ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
-        ei.index( targetEntity );
+       batch.index( indexScope, targetEntity );
 
         // Index the new connection in app|scope|all-types context
         IndexScope allTypesIndexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             cpHeadEntity.getId(), 
             ALL_TYPES);
-        EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
-        aei.index( targetEntity );
+
+        batch.index( allTypesIndexScope, targetEntity );
+
+        batch.execute();
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
         Mutator<ByteBuffer> m = createMutator( ko, be );
@@ -1307,21 +1331,23 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager(applicationScope);
         gm.deleteEdge(edge).toBlockingObservable().last();
 
+        final EntityIndex ei = managerCache.getEntityIndex( applicationScope )  ;
+        final EntityIndexBatch batch = ei.createBatch();
+
         // Deindex the connection in app|source|type context
         IndexScope indexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
             CpEntityManager.getConnectionScopeName( targetEntity.getId().getType(), connectionType ));
-        EntityIndex ei = managerCache.getEntityIndex( indexScope );
-        ei.deindex( targetEntity );
+        batch.deindex( indexScope , targetEntity );
 
         // Deindex the connection in app|source|type context
         IndexScope allTypesIndexScope = new IndexScopeImpl(
             applicationScope.getApplication(), 
             new SimpleId( connectingEntityRef.getUuid(), connectingEntityRef.getType() ),
             ALL_TYPES);
-        EntityIndex aei = managerCache.getEntityIndex(allTypesIndexScope);
-        aei.deindex( targetEntity );
+
+        batch.deindex( allTypesIndexScope,  targetEntity );
 
     }
 
@@ -1377,7 +1403,9 @@ public class CpRelationManager implements RelationManager {
                 applicationScope.getApplication(), 
                 cpHeadEntity.getId(), 
                 scopeName);
-            EntityIndex ei = managerCache.getEntityIndex(indexScope);
+
+            final EntityIndex ei = managerCache.getEntityIndex(applicationScope);
+
         
             logger.debug("Searching connected entities from scope {}:{}:{}", new String[] { 
                 indexScope.getApplication().toString(), 
@@ -1385,7 +1413,7 @@ public class CpRelationManager implements RelationManager {
                 indexScope.getName()}); 
 
             query = adjustQuery( query );
-            CandidateResults crs = ei.search( query );
+            CandidateResults crs = ei.search( indexScope, query );
 
             raw = buildResults( query , crs, query.getConnectionType() );
         }
@@ -1480,7 +1508,8 @@ public class CpRelationManager implements RelationManager {
                 applicationScope.getApplication(), 
                 cpHeadEntity.getId(), 
                 ALL_TYPES);
-            EntityIndex ei = managerCache.getEntityIndex(indexScope);
+
+            EntityIndex ei = managerCache.getEntityIndex(applicationScope);
         
             logger.debug("Searching connections from the all-types scope {}:{}:{}", new String[] { 
                 indexScope.getApplication().toString(), 
@@ -1488,7 +1517,7 @@ public class CpRelationManager implements RelationManager {
                 indexScope.getName()}); 
 
             query = adjustQuery( query );
-            CandidateResults crs = ei.search( query );
+            CandidateResults crs = ei.search(indexScope,  query );
 
             return buildConnectionResults(query , crs, query.getConnectionType() );
         }
@@ -1498,7 +1527,7 @@ public class CpRelationManager implements RelationManager {
             cpHeadEntity.getId(), 
             CpEntityManager.getConnectionScopeName( 
                     query.getEntityType(), query.getConnectionType() ));
-        EntityIndex ei = managerCache.getEntityIndex(indexScope);
+        EntityIndex ei = managerCache.getEntityIndex(applicationScope);
     
         logger.debug("Searching connections from the '{}' scope {}:{}:{}", new String[] { 
             indexScope.getApplication().toString(), 
@@ -1506,7 +1535,7 @@ public class CpRelationManager implements RelationManager {
             indexScope.getName()}); 
 
         query = adjustQuery( query );
-        CandidateResults crs = ei.search( query );
+        CandidateResults crs = ei.search( indexScope, query );
 
         return buildConnectionResults(query , crs, query.getConnectionType() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
index 6ee62c6..d59432b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/CpEntityIndexDeleteListenerTest.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImp
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.core.entity.EntityVersion;
 import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
@@ -82,6 +83,10 @@ public class CpEntityIndexDeleteListenerTest {
         when(scope.getApplication()).thenReturn(entityId);
         when(eif.createEntityIndex(any(IndexScope.class))).thenReturn(entityIndex);
 
+        final EntityIndexBatch batch = mock(EntityIndexBatch.class);
+
+        when(entityIndex.createBatch()).thenReturn( batch );
+
         CandidateResults results = mock(CandidateResults.class);
         List<CandidateResult> resultsList  = new ArrayList<>();
         resultsList.add(entity);
@@ -90,7 +95,7 @@ public class CpEntityIndexDeleteListenerTest {
         when(results.iterator()).thenReturn(entities);
         when(serializationFig.getBufferSize()).thenReturn(10);
         when(serializationFig.getHistorySize()).thenReturn(20);
-        when(entityIndex.getEntityVersions(entityId)).thenReturn(results);
+        when(entityIndex.getEntityVersions(any(IndexScope.class), entityId)).thenReturn(results);
         MvccEntity mvccEntity = new MvccEntityImpl(entityId,uuid, MvccEntity.Status.COMPLETE,mock(Entity.class));
 
 
@@ -98,6 +103,9 @@ public class CpEntityIndexDeleteListenerTest {
         Observable<EntityVersion> o = esEntityIndexDeleteListener.receive(event);
         EntityVersion testEntity = o.toBlocking().last();
         assertEquals(testEntity.getId(),mvccEntity.getId());
-        verify(entityIndex).deindex(entity.getId(),entity.getVersion());
+
+        verify(entityIndex).createBatch();
+
+        verify(batch).deindex(any(IndexScope.class), entity.getId(),entity.getVersion());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
index 4e067c2..e8dbb02 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationScopeImpl.java
@@ -48,13 +48,13 @@ public class ApplicationScopeImpl implements ApplicationScope {
         if ( this == o ) {
             return true;
         }
-        if ( !( o instanceof ApplicationScopeImpl ) ) {
+        if ( !( o instanceof ApplicationScope ) ) {
             return false;
         }
 
-        final ApplicationScopeImpl that = ( ApplicationScopeImpl ) o;
+        final ApplicationScope that = ( ApplicationScope ) o;
 
-        if ( !application.equals( that.application ) ) {
+        if ( !application.equals( that.getApplication() ) ) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
index 9c29d56..57391de 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationTest.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 
 import org.junit.Before;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -656,6 +657,7 @@ public abstract class EdgeSerializationTest {
      * Test paging by resuming the search from the edge
      */
     @Test
+    @Ignore("Kills embedded cassandra")
     public void pageIteration() throws ConnectionException {
 
         int size = graphFig.getScanPageSize() * 2;
@@ -695,6 +697,7 @@ public abstract class EdgeSerializationTest {
      * edge types
      */
     @Test
+    @Ignore("Kills embedded cassandra")
     public void testIteratorPaging() throws ConnectionException {
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index d9857f2..93f0e41 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -42,6 +42,7 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.query.CandidateResult;
+import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.field.ArrayField;
@@ -91,15 +92,20 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     private BulkRequestBuilder bulkRequest;
 
+    private final int autoFlushSize;
 
-    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope,
-                                   final Client client, final IndexFig config, final Set<String> knownTypes ) {
+    private int count;
+
+
+    public EsEntityIndexBatchImpl( final ApplicationScope applicationScope, final Client client, final IndexFig config,
+                                   final Set<String> knownTypes, final int autoFlushSize ) {
 
         this.applicationScope = applicationScope;
         this.client = client;
         this.knownTypes = knownTypes;
         this.indexName = createIndexName( config.getIndexPrefix(), applicationScope );
         this.refresh = config.isForcedRefresh();
+        this.autoFlushSize = autoFlushSize;
         initBatch();
     }
 
@@ -107,6 +113,9 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     @Override
     public EntityIndexBatch index( final IndexScope indexScope, final Entity entity ) {
 
+
+        IndexValidationUtils.validateScopeMatch( indexScope, applicationScope );
+
         final String indexType = createCollectionScopeTypeName( indexScope );
 
         if ( log.isDebugEnabled() ) {
@@ -118,7 +127,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         ValidationUtils.verifyEntityWrite( entity );
 
-        initType(indexScope,  indexType );
+        initType( indexScope, indexType );
 
         Map<String, Object> entityAsMap = entityToMap( entity );
 
@@ -135,12 +144,16 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         bulkRequest.add( client.prepareIndex( indexName, indexType, indexId ).setSource( entityAsMap ) );
 
+        maybeFlush();
+
         return this;
     }
 
 
     @Override
-    public EntityIndexBatch deindex(final IndexScope indexScope, final Id id, final UUID version ) {
+    public EntityIndexBatch deindex( final IndexScope indexScope, final Id id, final UUID version ) {
+
+        IndexValidationUtils.validateScopeMatch( indexScope, applicationScope );
 
         final String indexType = createCollectionScopeTypeName( indexScope );
 
@@ -158,6 +171,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
         log.debug( "Deindexed Entity with index id " + indexId );
 
+        maybeFlush();
+
         return this;
     }
 
@@ -165,14 +180,14 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
     @Override
     public EntityIndexBatch deindex( final IndexScope indexScope, final Entity entity ) {
 
-       return  deindex(indexScope,  entity.getId(), entity.getVersion() );
+        return deindex( indexScope, entity.getId(), entity.getVersion() );
     }
 
 
     @Override
     public EntityIndexBatch deindex( final IndexScope indexScope, final CandidateResult entity ) {
 
-        return deindex(indexScope,  entity.getId(), entity.getVersion() );
+        return deindex( indexScope, entity.getId(), entity.getVersion() );
     }
 
 
@@ -184,7 +199,6 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     /**
      * Execute the request, check for errors, then re-init the batch for future use
-     * @param request
      */
     private void execute( final BulkRequestBuilder request ) {
         final BulkResponse response = request.execute().actionGet();
@@ -199,7 +213,17 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
 
     @Override
     public void executeAndRefresh() {
-        execute(bulkRequest.setRefresh( true ) );
+        execute( bulkRequest.setRefresh( true ) );
+    }
+
+
+    private void maybeFlush() {
+        count++;
+
+        if ( count % autoFlushSize == 0 ) {
+            execute();
+            count = 0;
+        }
     }
 
 
@@ -217,6 +241,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
         try {
             XContentBuilder mxcb = EsEntityIndexImpl.createDoubleStringIndexMapping( jsonBuilder(), typeName );
 
+
+            //TODO Dave can this be collapsed into the build as well?
             admin.indices().preparePutMapping( indexName ).setType( typeName ).setSource( mxcb ).execute().actionGet();
 
             admin.indices().prepareGetMappings( indexName ).addTypes( typeName ).execute().actionGet();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 077036a..56316d8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -134,7 +134,7 @@ public class EsEntityIndexImpl implements EntityIndex {
 
     @Override
     public EntityIndexBatch createBatch() {
-        return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes );
+        return new EsEntityIndexBatchImpl( applicationScope, client, config, knownTypes, 1000 );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bba08ddc/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
index 899e7b0..d6080de 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/IndexValidationUtils.java
@@ -19,6 +19,7 @@
 package org.apache.usergrid.persistence.index.utils;
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.IndexScope;
 
 import com.google.common.base.Preconditions;
@@ -50,4 +51,13 @@ public class IndexValidationUtils {
     }
 
 
+    /**
+     * Validate the scope in the index matches the application scope
+     * @param indexScope
+     * @param scope
+     */
+    public static void validateScopeMatch(final IndexScope indexScope,final ApplicationScope scope){
+        Preconditions.checkArgument( scope.equals( indexScope ) );
+    }
+
 }