You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/22 20:21:15 UTC

[1/3] incubator-usergrid git commit: Removed inner observable and converted it into regular code that functions. Will convert back in next push.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-608 ed317b295 -> 630cb4a87


Removed inner observable and converted it into regular code that functions. Will convert back in next push.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/11648ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/11648ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/11648ab1

Branch: refs/heads/USERGRID-608
Commit: 11648ab1c84be349a45020e0cc4a9cafd60d005b
Parents: ed317b2
Author: GERey <gr...@apigee.com>
Authored: Mon May 18 15:08:15 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Mon May 18 15:08:15 2015 -0700

----------------------------------------------------------------------
 .../mvcc/stage/delete/UniqueCleanup.java        | 106 ++++++++++++-------
 1 file changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/11648ab1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 0034f03..3e3e531 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -88,44 +88,76 @@ public class UniqueCleanup
                 final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
                 final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
 
+                Iterator<UniqueValue> uniqueFields = uniqueValueSerializationStrategy.getAllUniqueFields(
+                    applicationScope, entityId );
+
+                final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                while(uniqueFields.hasNext()){
+                    UniqueValue uniqueValue = uniqueFields.next();
+                    final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+                    //TODO: should this be equals? That way we clean up the one marked as well
+                    if(UUIDComparator.staticCompare( entityVersion, uniqueValueVersion ) >= 0){
+                        logger
+                            .debug( "Deleting value:{} from application scope: {} ", uniqueValue, applicationScope );
+                        uniqueCleanupBatch.mergeShallow(
+                            uniqueValueSerializationStrategy.delete( applicationScope,uniqueValue ));
+
+                    }
+                }
+
+                try {
+                    uniqueCleanupBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute batch mutation", e );
+                }
+
+
+
                 //TODO Refactor this logic into a a class that can be invoked from anywhere
-                //iterate all unique values
-                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
-                    Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
-                        @Override
-                        protected Iterator<UniqueValue> getIterator() {
-                            return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
-                        }
-                    } )
-
-                        //skip  versions > the specified version
-                        .skipWhile( uniqueValue -> {
-
-                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
-
-                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
-                        } )
-
-                            //buffer our buffer size, then roll them all up in a single batch mutation
-                        .buffer( serializationFig.getBufferSize() )
-
-                            //roll them up
-                        .doOnNext( uniqueValues -> {
-                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                            for ( UniqueValue value : uniqueValues ) {
-                                uniqueCleanupBatch
-                                    .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
-                            }
-
-                            try {
-                                uniqueCleanupBatch.execute();
-                            }
-                            catch ( ConnectionException e ) {
-                                throw new RuntimeException( "Unable to execute batch mutation", e );
-                            }
-                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
+//                //iterate all unique values
+//                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
+//                    Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+//                        @Override
+//                        protected Iterator<UniqueValue> getIterator() {
+//                            return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
+//                        }
+//                    } )
+//
+//                        //skip  versions > the specified version
+//                        //TODO: does this emit for every version before the staticComparator?
+//                        .skipWhile( uniqueValue -> {
+//
+//                            logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion );
+//                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+//                            //TODO: should this be equals? That way we clean up the one marked as well
+//                            return UUIDComparator.staticCompare( entityVersion,uniqueValueVersion ) > 0;
+//                        } )
+//
+//                            //buffer our buffer size, then roll them all up in a single batch mutation
+//                        .buffer( serializationFig.getBufferSize() )
+//
+//                            //roll them up
+//                        .doOnNext( uniqueValues -> {
+//                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+//
+//
+//                            for ( UniqueValue value : uniqueValues ) {
+//                                logger
+//                                    .debug( "Deleting value:{} from application scope: {} ", value, applicationScope );
+//                                uniqueCleanupBatch
+//                                    .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+//                            }
+//
+//                            try {
+//                                uniqueCleanupBatch.execute();
+//                            }
+//                            catch ( ConnectionException e ) {
+//                                throw new RuntimeException( "Unable to execute batch mutation", e );
+//                            }
+//                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
             } );
 
         return ObservableTimer.time( outputObservable, uniqueCleanupTimer );


[3/3] incubator-usergrid git commit: [USERGRID-677] Added revised test for the scrolling query. Added fixes to method so that it properly logs aggregation and searching for versions.

Posted by gr...@apache.org.
[USERGRID-677] Added revised test for the scrolling query.
Added fixes to method so that it properly logs aggregation and searching for versions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/630cb4a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/630cb4a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/630cb4a8

Branch: refs/heads/USERGRID-608
Commit: 630cb4a87e0943d03b9d515166b1529b4cd649d6
Parents: cf823c4
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 11:21:09 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 11:21:09 2015 -0700

----------------------------------------------------------------------
 .../impl/EsApplicationEntityIndexImpl.java      | 43 +++++++++++------
 .../persistence/index/impl/EntityIndexTest.java | 49 +++++++++++++++++++-
 2 files changed, 75 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/630cb4a8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index d4ed36a..ad47348 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
 import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.FilterBuilders;
@@ -227,16 +228,11 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
 
 
     @Override
-    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit,
-                                                           final int offset ) {
+    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion) {
 
-        /**
-         * Take a list of IndexEdge, with an entityId
-         and query Es directly for matches
-
-         */
         Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+        //TODO: check to see if there is some version verifcation. I know there is but i forget where.
+        Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" );
 
         SearchResponse searchResponse;
 
@@ -246,14 +242,21 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
 
         final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
 
-        FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+        //I can't just search on the entity Id.
+        FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
             IndexingUtils.idString( entityId ) );
-        srb.setPostFilter( termFilter );
+
+        FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion );
+
+            //aggregate the filters into the and filder and feed that in.
+        FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter  );
+
+        srb.setPostFilter(andFilter);
 
 
 
         if ( logger.isDebugEnabled() ) {
-            logger.debug( "Searching for edge index (read alias): {}\n  nodeId: {},\n   query: {} ",
+            logger.debug( "Searching for marked versions in index (read alias): {}\n  nodeId: {},\n   query: {} ",
                 this.alias.getReadAlias(),entityId, srb );
         }
 
@@ -264,6 +267,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
             //REfactor this out and have it return a modified parseResults that will create the candidateResults from
             //the hit results and then keep that
             //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+            //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
+            //TODO: review this and make them not magic numbers when acking this PR.
             searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
 
             //list that will hold all of the search hits
@@ -273,9 +278,17 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
                 //add search result hits to some sort of running tally of hits.
                 candidates = aggregateScrollResults( candidates, searchResponse );
 
-                searchResponse = searchRequestBuilderStrategyV2
+                SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
                     .getScrollBuilder( searchResponse.getScrollId() )
-                    .setScroll( new TimeValue( 6000 ) ).execute().actionGet();
+                    .setScroll( new TimeValue( 6000 ) );
+
+                //TODO: figure out how to log exactly what we're putting into elasticsearch
+//                if ( logger.isDebugEnabled() ) {
+//                    logger.debug( "Scroll search using query: {} ",
+//                        ssrb.toString() );
+//                }
+
+                searchResponse = ssrb.execute().actionGet();
 
                 if (searchResponse.getHits().getHits().length == 0) {
                     break;
@@ -385,8 +398,6 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         final SearchHits searchHits = searchResponse.getHits();
         final SearchHit[] hits = searchHits.getHits();
 
-        logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
-
         for ( SearchHit hit : hits ) {
 
             final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
@@ -394,6 +405,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
             candidates.add( candidateResult );
         }
 
+        logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() );
+
         return  candidates;
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/630cb4a8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index df0c701..79fc14a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -298,7 +298,7 @@ public class EntityIndexTest extends BaseIT {
         insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 0 );
 
 
-        ei.addIndex("v2", 1, 0, "one");
+        ei.addIndex( "v2", 1, 0, "one" );
         ei.refreshAsync().toBlocking().first();
 
         insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 1 );
@@ -325,7 +325,7 @@ public class EntityIndexTest extends BaseIT {
         batch.execute().toBlocking().last();
         IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  ei.refreshAsync().toBlocking().first();
         long time = info.getExecutionTime();
-        log.info("refresh took ms:"+time);
+        log.info( "refresh took ms:" + time );
     }
 
 
@@ -405,6 +405,51 @@ public class EntityIndexTest extends BaseIT {
     }
 
 
+    /**
+     * Tests that we aggregate results only before the halfway version point.
+     */
+    @Test
+    public void testScollingDeindex() {
+
+        int numberOfEntities = 1000;
+        int versionToSearchFor = numberOfEntities / 2;
+
+        Id appId = new SimpleId( "application" );
+
+        ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
+        IndexEdge searchEdge = new IndexEdgeImpl( appId, "mehCars", SearchEdge.NodeType.SOURCE, 1 );
+
+        ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex( applicationScope );
+
+        UUID entityUUID = UUID.randomUUID();
+        Id entityId = new SimpleId( "mehCar" );
+
+        Map entityMap = new HashMap() {{
+            put( "name", "Toyota Corolla" );
+            put( "introduced", 1966 );
+            put( "topspeed", 111 );
+        }};
+
+        Entity[] entity = new Entity[numberOfEntities];
+        for(int i = 0; i < numberOfEntities; i++) {
+            entity[i] = EntityIndexMapUtils.fromMap( entityMap );
+            EntityUtils.setId( entity[i], entityId );
+            EntityUtils.setVersion( entity[i], UUIDGenerator.newTimeUUID() );
+            entity[i].setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID ) );
+
+            //index the new entity. This is where the loop will be set to create like 100 entities.
+            entityIndex.createBatch().index( searchEdge, entity[i] ).execute().toBlocking().last();
+        }
+        ei.refreshAsync().toBlocking().first();
+
+        CandidateResults candidateResults = entityIndex
+            .getAllEntityVersionBeforeMark( entity[versionToSearchFor].getId(), entity[versionToSearchFor].getVersion());
+        assertEquals( 501, candidateResults.size() );
+    }
+
+
+
     private CandidateResults testQuery( final SearchEdge scope, final SearchTypes searchTypes,
                                         final ApplicationEntityIndex entityIndex, final String queryString,
                                         final int num ) {


[2/3] incubator-usergrid git commit: [USERGRID-677] Added a new SearchRequestBuilderStrategy that allows you to make term post filters easily without the cruft of the regular SearchRequestBuilderStrategy. Added a version of the scrolling api and an addit

Posted by gr...@apache.org.
[USERGRID-677] Added a new SearchRequestBuilderStrategy that allows you to make term post filters easily without the cruft of the regular SearchRequestBuilderStrategy.
Added a version of the scrolling api and an additional parser that will let us aggregate the from the constant queries and return a single CandidateResults set.
Added a test that currently just checks to make sure the new scrolling search still functions as a regular search ( it does! Yay! ).


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf823c47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf823c47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf823c47

Branch: refs/heads/USERGRID-608
Commit: cf823c47d3fc2bb92198e7d1e65ecc4be96ccfcc
Parents: 11648ab
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 09:53:44 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 09:53:44 2015 -0700

----------------------------------------------------------------------
 .../SearchRequestBuilderStrategyV2.java         | 69 ++++++++++++++
 .../impl/EsApplicationEntityIndexImpl.java      | 98 ++++++++++++++++++--
 2 files changed, 160 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
new file mode 100644
index 0000000..0511b75
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
@@ -0,0 +1,69 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder;
+
+
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
+import org.elasticsearch.action.search.SearchType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.index.impl.IndexAlias;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
+
+
+/**
+ * The class to use when creating new custom queries for specific terms to elasticsearch.
+ */
+public class SearchRequestBuilderStrategyV2 {
+    private static final Logger logger = LoggerFactory.getLogger( SearchRequestBuilderStrategyV2.class );
+
+    private final EsProvider esProvider;
+    private final ApplicationScope applicationScope;
+    private final IndexAlias alias;
+    private final int cursorTimeout;
+
+
+    public SearchRequestBuilderStrategyV2( final EsProvider esProvider, final ApplicationScope applicationScope,
+                                         final IndexAlias alias, int cursorTimeout ) {
+
+        this.esProvider = esProvider;
+        this.applicationScope = applicationScope;
+        this.alias = alias;
+        this.cursorTimeout = cursorTimeout;
+    }
+
+    public SearchRequestBuilder getBuilder(){
+        SearchRequestBuilder srb =
+            esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes( IndexingUtils.ES_ENTITY_TYPE ).setSearchType(
+                SearchType.QUERY_THEN_FETCH);
+
+
+        return srb;
+    }
+
+    public SearchScrollRequestBuilder getScrollBuilder(String scrollId){
+        return esProvider.getClient().prepareSearchScroll( scrollId );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2709569..d4ed36a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -32,6 +32,7 @@ import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.FilterBuilder;
 import org.elasticsearch.index.query.FilterBuilders;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -41,8 +42,6 @@ import org.elasticsearch.search.SearchHits;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.lang.NotImplementedException;
-
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -50,6 +49,7 @@ import org.apache.usergrid.persistence.index.AliasedEntityIndex;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.CandidateResult;
 import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
 import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
@@ -96,6 +96,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
     private final Timer deleteApplicationTimer;
     private final Meter deleteApplicationMeter;
     private final SearchRequestBuilderStrategy searchRequest;
+    private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
     private FailureMonitor failureMonitor;
     private final int cursorTimeout;
     private final long queryTimeout;
@@ -119,7 +120,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         this.esProvider = provider;
 
         mapManager = mapManagerFactory.createMapManager( mapScope );
-        this.searchTimer = metricsFactory.getTimer(EsApplicationEntityIndexImpl.class, "search.timer");
+        this.searchTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.timer" );
         this.cursorTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.cursor.timer" );
         this.cursorTimeout = config.getQueryCursorTimeout();
         this.queryTimeout = config.getWriteTimeout();
@@ -132,6 +133,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         this.alias = indexIdentifier.getAlias();
 
         this.searchRequest = new SearchRequestBuilderStrategy( esProvider, appScope, alias, cursorTimeout );
+        this.searchRequestBuilderStrategyV2 = new SearchRequestBuilderStrategyV2( esProvider, appScope, alias, cursorTimeout  );
     }
 
 
@@ -156,7 +158,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query );
 
         final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
-                                                      .setTimeout( TimeValue.timeValueMillis(queryTimeout) );
+                                                      .setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
 
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Searching index (read alias): {}\n  nodeId: {}, edgeType: {},  \n type: {}\n   query: {} ",
@@ -204,7 +206,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         if ( logger.isDebugEnabled() ) {
             logger.debug( "Searching for edge index (read alias): {}\n  nodeId: {}, edgeType: {},  \n type: {}\n   query: {} ",
                 this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(),
-                SearchTypes.fromTypes( entityId.getType()), srb );
+                SearchTypes.fromTypes( entityId.getType() ), srb );
         }
 
         try {
@@ -225,9 +227,72 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
 
 
     @Override
-    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit,
+    public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit,
                                                            final int offset ) {
-        throw new NotImplementedException( "Implement me or else I won't work." );
+
+        /**
+         * Take a list of IndexEdge, with an entityId
+         and query Es directly for matches
+
+         */
+        Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+        Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+        SearchResponse searchResponse;
+
+        List<CandidateResult> candidates = new ArrayList<>();
+
+        final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+
+        final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+        FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+            IndexingUtils.idString( entityId ) );
+        srb.setPostFilter( termFilter );
+
+
+
+        if ( logger.isDebugEnabled() ) {
+            logger.debug( "Searching for edge index (read alias): {}\n  nodeId: {},\n   query: {} ",
+                this.alias.getReadAlias(),entityId, srb );
+        }
+
+        try {
+            //Added For Graphite Metrics
+            Timer.Context timeSearch = searchTimer.time();
+
+            //REfactor this out and have it return a modified parseResults that will create the candidateResults from
+            //the hit results and then keep that
+            //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+            searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+
+            //list that will hold all of the search hits
+
+
+            while(true){
+                //add search result hits to some sort of running tally of hits.
+                candidates = aggregateScrollResults( candidates, searchResponse );
+
+                searchResponse = searchRequestBuilderStrategyV2
+                    .getScrollBuilder( searchResponse.getScrollId() )
+                    .setScroll( new TimeValue( 6000 ) ).execute().actionGet();
+
+                if (searchResponse.getHits().getHits().length == 0) {
+                    break;
+                }
+
+
+            }
+            timeSearch.stop();
+        }
+        catch ( Throwable t ) {
+            logger.error( "Unable to communicate with Elasticsearch", t );
+            failureMonitor.fail( "Unable to execute batch", t );
+            throw t;
+        }
+        failureMonitor.success();
+
+        return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
     }
 
 
@@ -314,4 +379,23 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
         return candidateResults;
     }
 
+    private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
+                                                          final SearchResponse searchResponse ){
+
+        final SearchHits searchHits = searchResponse.getHits();
+        final SearchHit[] hits = searchHits.getHits();
+
+        logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
+
+        for ( SearchHit hit : hits ) {
+
+            final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
+
+            candidates.add( candidateResult );
+        }
+
+        return  candidates;
+
+    }
+
 }