You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/26 21:23:26 UTC

[01/10] incubator-usergrid git commit: USERGRID-457 - Add explicit Content-Length of 0 for POST request with no body.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-501 8a62d1177 -> 2df40137c


USERGRID-457 - Add explicit Content-Length of 0 for POST request with no body.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5937f9fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5937f9fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5937f9fb

Branch: refs/heads/USERGRID-501
Commit: 5937f9fb0618d1f1ca38e817b14c7e25cf8b7bd7
Parents: 61c5f9c
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Mar 19 20:12:45 2015 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Mar 19 20:17:46 2015 -0700

----------------------------------------------------------------------
 .../usergrid/security/providers/PingIdentityProvider.java       | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5937f9fb/stack/services/src/main/java/org/apache/usergrid/security/providers/PingIdentityProvider.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/security/providers/PingIdentityProvider.java b/stack/services/src/main/java/org/apache/usergrid/security/providers/PingIdentityProvider.java
index 34d9b80..3c8d209 100644
--- a/stack/services/src/main/java/org/apache/usergrid/security/providers/PingIdentityProvider.java
+++ b/stack/services/src/main/java/org/apache/usergrid/security/providers/PingIdentityProvider.java
@@ -127,12 +127,9 @@ public class PingIdentityProvider extends AbstractProvider {
                               .queryParam( "grant_type", "urn:pingidentity.com:oauth2:grant_type:validate_bearer" )
                               .queryParam( "client_secret", clientSecret ).queryParam( "client_id", clientId )
                               .queryParam( "token", externalToken ).type( MediaType.APPLICATION_FORM_URLENCODED_TYPE )
+                              .header("Content-Length", "0")
                               .post( JsonNode.class );
 
-        // {"token_type":"urn:pingidentity.com:oauth2:validated_token","expires_in":5383,
-        // "client_id":"dev.app.appservices","access_token":{"subject":"svccastiron@burberry.com",
-        // "client_id":"dev.app.appservices"}}
-
         String rawEmail = node.get( "access_token" ).get( "subject" ).asText();
 
         Map<String, Object> userMap = new HashMap<String, Object>();


[03/10] incubator-usergrid git commit: Updates iteration logic in the management service for iterating apps

Posted by sf...@apache.org.
Updates iteration logic in the management service for iterating apps

Reverts connect back removal.  This is required for dual indexing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/beb2a2a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/beb2a2a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/beb2a2a5

Branch: refs/heads/USERGRID-501
Commit: beb2a2a5337927f13d9a7c563bb936214eda159b
Parents: c7fa864
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 25 16:59:21 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 25 16:59:21 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java        | 13 ++++++-------
 stack/core/src/test/resources/log4j.properties    |  2 +-
 .../cassandra/ManagementServiceImpl.java          | 18 +++++++++++-------
 3 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/beb2a2a5/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 7179baf..da39ea9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -702,13 +702,12 @@ public class CpRelationManager implements RelationManager {
         //            headEntityScope.getOwner().toString(),
         //            headEntityScope.getName()});
 
-        //TODO T.N.  This should even be neccessary any longer, graph maintains 2 edges.  .
-//        if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
-//            getRelationManager( itemEntity ).addToCollection(
-//                    collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
-//            getRelationManager( itemEntity ).addToCollection(
-//                    collection.getLinkedCollection(), headEntity, false );
-//        }
+        if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
+            getRelationManager( itemEntity ).addToCollection(
+                    collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
+            getRelationManager( itemEntity ).addToCollection(
+                    collection.getLinkedCollection(), headEntity, false );
+        }
 
         return itemEntity;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/beb2a2a5/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties
index 0ba16ea..3dee8f6 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -45,7 +45,7 @@ log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=D
 
 log4j.logger.org.apache.usergrid.corepersistence.migration=WARN
 
-log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO
 #log4j.logger.org.apache.usergrid.corepersistence=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpEntityManagerFactory=DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/beb2a2a5/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 854c3e0..0cf80b4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -1477,21 +1477,25 @@ public class ManagementServiceImpl implements ManagementService {
 
         BiMap<UUID, String> organizations = HashBiMap.create();
         EntityManager em = emf.getEntityManager( smf.getManagementAppId() );
-        Results results = em.getCollection( new SimpleEntityRef( User.ENTITY_TYPE, userId ), "groups", null, 10000,
+        Results results = em.getCollection( new SimpleEntityRef( User.ENTITY_TYPE, userId ), "groups", null, 1000,
                 Level.ALL_PROPERTIES, false );
 
         String path = null;
 
-        for ( Entity entity : results.getEntities() ) {
+        do {
+            for ( Entity entity : results.getEntities() ) {
+
+                path = ( String ) entity.getProperty( PROPERTY_PATH );
 
-            path = ( String ) entity.getProperty( PROPERTY_PATH );
+                if ( path != null ) {
+                    path = path.toLowerCase();
+                }
 
-            if ( path != null ) {
-                path = path.toLowerCase();
+                organizations.put( entity.getUuid(), path );
             }
 
-            organizations.put( entity.getUuid(), path );
-        }
+            results = results.getNextPageResults();
+        }while(results != null);
 
         return organizations;
     }


[08/10] incubator-usergrid git commit: add version migration for indexes

Posted by sf...@apache.org.
add version migration for indexes


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c09e6b4d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c09e6b4d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c09e6b4d

Branch: refs/heads/USERGRID-501
Commit: c09e6b4d8d3213c54976184593e6bb592b0c130b
Parents: bf0718e
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 26 11:27:34 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 26 11:27:34 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/migration/IndexDataVersions.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c09e6b4d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
index 921d86a..6558339 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
@@ -21,7 +21,7 @@
 package org.apache.usergrid.persistence.index.migration;
 
 /**
- * Classy class class.
+ * Index data migration
  */
 public enum IndexDataVersions {
     MANY_INDEXES(0),


[09/10] incubator-usergrid git commit: move back to old Query object

Posted by sf...@apache.org.
move back to old Query object


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/96d27b95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/96d27b95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/96d27b95

Branch: refs/heads/USERGRID-501
Commit: 96d27b95336b41aee16180fe80fbf9bc9fb21f4d
Parents: c09e6b4
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 26 13:34:01 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 26 13:34:01 2015 -0600

----------------------------------------------------------------------
 .../index/ApplicationEntityIndex.java           |   6 +-
 .../usergrid/persistence/index/EntityIndex.java |   2 +
 .../impl/EsApplicationEntityIndexImpl.java      |   9 +-
 .../impl/SearchRequestBuilderStrategy.java      |  11 +-
 .../usergrid/persistence/index/query/Query.java | 184 ++++++++++++++++---
 5 files changed, 174 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96d27b95/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
index 2167efa..59a19eb 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
@@ -43,11 +43,9 @@ public interface ApplicationEntityIndex {
      * Execute query in Usergrid syntax.
      */
     public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query);
+    public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query, final int limit);
+
 
-    /**
-     * Execute query in Usergrid syntax.
-     */
-    public CandidateResults search(final IndexScope indexScope, final SearchTypes searchType, final Query query, final int limit );
 
     /**
      * get next page of results

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96d27b95/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index af6b013..6783fb0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -39,6 +39,8 @@ import java.util.concurrent.Future;
 public interface EntityIndex extends CPManager {
 
 
+    public static final int MAX_LIMIT = 1000;
+
     /**
      * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
      * @param indexSuffix index name

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96d27b95/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 27791ae..603bee4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -136,16 +136,15 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
     }
 
     @Override
-    public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query){
-        return search(indexScope,searchTypes,query,10);
+    public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query) {
+        return search(indexScope,searchTypes,query,query.getLimit());
     }
-
     @Override
-    public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query, final int limit) {
+    public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query, final int limit){
 
         SearchResponse searchResponse;
 
-        SearchRequestBuilder srb = searchRequest.getBuilder(indexScope, searchTypes, query, limit);
+        SearchRequestBuilder srb = searchRequest.getBuilder(indexScope, searchTypes, query,limit);
 
         if (logger.isDebugEnabled()) {
             logger.debug("Searching index (read alias): {}\n  scope: {} \n type: {}\n   query: {} ",

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96d27b95/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
index 15796f9..207a7a8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
@@ -22,10 +22,7 @@ package org.apache.usergrid.persistence.index.impl;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.index.IndexAlias;
-import org.apache.usergrid.persistence.index.IndexIdentifier;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
@@ -56,7 +53,6 @@ public class SearchRequestBuilderStrategy {
     private final ApplicationScope applicationScope;
     private final IndexAlias alias;
     private final int cursorTimeout;
-    public static final int MAX_LIMIT = 1000;
 
     public SearchRequestBuilderStrategy(final EsProvider esProvider, final ApplicationScope applicationScope, final IndexAlias alias, int cursorTimeout){
 
@@ -66,8 +62,9 @@ public class SearchRequestBuilderStrategy {
         this.cursorTimeout = cursorTimeout;
     }
 
-    public SearchRequestBuilder getBuilder(final IndexScope indexScope, final SearchTypes searchTypes, final Query query, final int limit) {
-        Preconditions.checkArgument(limit <= MAX_LIMIT, "limit is greater than max "+ MAX_LIMIT);
+    public SearchRequestBuilder getBuilder(final IndexScope indexScope, final SearchTypes searchTypes, final Query query,  final int limit) {
+
+        Preconditions.checkArgument(limit <= EntityIndex.MAX_LIMIT, "limit is greater than max "+ EntityIndex.MAX_LIMIT);
 
         SearchRequestBuilder srb = esProvider.getClient().prepareSearch(alias.getReadAlias())
             .setTypes(searchTypes.getTypeNames(applicationScope))

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/96d27b95/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index fa7def4..5567382 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@ -40,7 +40,10 @@ import org.antlr.runtime.Token;
 import org.antlr.runtime.TokenRewriteStream;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
+import org.apache.usergrid.persistence.index.exceptions.IndexException;
 import org.apache.usergrid.persistence.index.exceptions.QueryParseException;
+import org.apache.usergrid.persistence.index.impl.EsQueryVistor;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
 import org.apache.usergrid.persistence.index.query.tree.AndOperand;
 import org.apache.usergrid.persistence.index.query.tree.ContainsOperand;
 import org.apache.usergrid.persistence.index.query.tree.CpQueryFilterLexer;
@@ -52,10 +55,14 @@ import org.apache.usergrid.persistence.index.query.tree.GreaterThanEqual;
 import org.apache.usergrid.persistence.index.query.tree.LessThan;
 import org.apache.usergrid.persistence.index.query.tree.LessThanEqual;
 import org.apache.usergrid.persistence.index.query.tree.Operand;
+import org.apache.usergrid.persistence.index.query.tree.QueryVisitor;
 import org.apache.usergrid.persistence.index.utils.ClassUtils;
 import org.apache.usergrid.persistence.index.utils.ConversionUtils;
 import org.apache.usergrid.persistence.index.utils.ListUtils;
 import org.apache.usergrid.persistence.index.utils.MapUtils;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +79,9 @@ public class Query {
         IDS, REFS, CORE_PROPERTIES, ALL_PROPERTIES, LINKED_PROPERTIES
     }
 
+    public static final int DEFAULT_LIMIT = 10;
 
+    public static final int MAX_LIMIT = 1000;
 
     public static final String PROPERTY_UUID = "uuid";
 
@@ -80,6 +89,8 @@ public class Query {
     private List<SortPredicate> sortPredicates = new ArrayList<SortPredicate>();
     private Operand rootOperand;
     private UUID startResult;
+    private String cursor;
+    private int limit = 0;
 
     private Map<String, String> selectAssignments = new LinkedHashMap<String, String>();
     private boolean mergeSelectResults = false;
@@ -111,19 +122,22 @@ public class Query {
      * @param q
      */
     public Query( Query q ) {
-        if (q == null) {
+        if ( q == null ) {
             return;
         }
+
         type = q.type;
         sortPredicates = q.sortPredicates != null
-                ? new ArrayList<SortPredicate>(q.sortPredicates) : null;
+            ? new ArrayList<>( q.sortPredicates ) : null;
         startResult = q.startResult;
+        cursor = q.cursor;
+        limit = q.limit;
         selectAssignments = q.selectAssignments != null
-                ? new LinkedHashMap<String, String>(q.selectAssignments) : null;
+            ? new LinkedHashMap<>( q.selectAssignments ) : null;
         mergeSelectResults = q.mergeSelectResults;
         //level = q.level;
         connectionType = q.connectionType;
-        permissions = q.permissions != null ? new ArrayList<String>(q.permissions) : null;
+        permissions = q.permissions != null ? new ArrayList<>( q.permissions ) : null;
         reversed = q.reversed;
         reversedSet = q.reversedSet;
         startTime = q.startTime;
@@ -132,16 +146,76 @@ public class Query {
         pad = q.pad;
         rootOperand = q.rootOperand;
         identifiers = q.identifiers != null
-                ? new ArrayList<Identifier>(q.identifiers) : null;
+            ? new ArrayList<>( q.identifiers ) : null;
         counterFilters = q.counterFilters != null
-                ? new ArrayList<CounterFilterPredicate>(q.counterFilters) : null;
+            ? new ArrayList<>( q.counterFilters ) : null;
         collection = q.collection;
-
         level = q.level;
 
     }
 
 
+    public QueryBuilder createQueryBuilder( final String context ) {
+
+
+        QueryBuilder queryBuilder = null;
+
+
+        //we have a root operand.  Translate our AST into an ES search
+        if ( getRootOperand() != null ) {
+            // In the case of geo only queries, this will return null into the query builder.
+            // Once we start using tiles, we won't need this check any longer, since a geo query
+            // will return a tile query + post filter
+            QueryVisitor v = new EsQueryVistor();
+
+            try {
+                getRootOperand().visit( v );
+            }
+            catch ( IndexException ex ) {
+                throw new RuntimeException( "Error building ElasticSearch query", ex );
+            }
+
+
+            queryBuilder = v.getQueryBuilder();
+        }
+
+
+        // Add our filter for context to our query for fast execution.
+        // Fast because it utilizes bitsets internally. See this post for more detail.
+        // http://www.elasticsearch.org/blog/all-about-elasticsearch-filter-bitsets/
+
+        // TODO evaluate performance when it's an all query.
+        // Do we need to put the context term first for performance?
+        if ( queryBuilder != null ) {
+            queryBuilder = QueryBuilders.boolQuery().must( queryBuilder ).must( QueryBuilders
+                .termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context ) );
+        }
+
+        //nothing was specified ensure we specify the context in the search
+        else {
+            queryBuilder = QueryBuilders.termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context );
+        }
+
+        return queryBuilder;
+    }
+
+
+    public FilterBuilder createFilterBuilder() {
+        FilterBuilder filterBuilder = null;
+
+        if ( getRootOperand() != null ) {
+            QueryVisitor v = new EsQueryVistor();
+            try {
+                getRootOperand().visit( v );
+
+            } catch ( IndexException ex ) {
+                throw new RuntimeException( "Error building ElasticSearch query", ex );
+            }
+            filterBuilder = v.getFilterBuilder();
+        }
+
+        return filterBuilder;
+    }
 
 
     /**
@@ -169,8 +243,8 @@ public class Query {
 
         String qlt = ql.toLowerCase();
         if (       !qlt.startsWith( "select" )
-                && !qlt.startsWith( "insert" )
-                && !qlt.startsWith( "update" ) && !qlt.startsWith( "delete" ) ) {
+            && !qlt.startsWith( "insert" )
+            && !qlt.startsWith( "update" ) && !qlt.startsWith( "delete" ) ) {
 
             if ( qlt.startsWith( "order by" ) ) {
                 ql = "select * " + ql;
@@ -224,7 +298,7 @@ public class Query {
 
         if ( o instanceof Map ) {
             @SuppressWarnings({ "unchecked", "rawtypes" }) Map<String, List<String>> params =
-                    ClassUtils.cast( MapUtils.toMapList( ( Map ) o ) );
+                ClassUtils.cast( MapUtils.toMapList( ( Map ) o ) );
             return fromQueryParams( params );
         }
         return null;
@@ -232,7 +306,7 @@ public class Query {
 
 
     public static Query fromQueryParams( Map<String, List<String>> params )
-            throws QueryParseException {
+        throws QueryParseException {
         Query q = null;
         CounterResolution resolution = null;
         List<Identifier> identifiers = null;
@@ -243,6 +317,8 @@ public class Query {
         Boolean reversed = ListUtils.firstBoolean( params.get( "reversed" ) );
         String connection = ListUtils.first( params.get( "connectionType" ) );
         UUID start = ListUtils.firstUuid( params.get( "start" ) );
+        String cursor = ListUtils.first( params.get( "cursor" ) );
+        Integer limit = ListUtils.firstInteger( params.get( "limit" ) );
         List<String> permissions = params.get( "permission" );
         Long startTime = ListUtils.firstLong( params.get( "start_time" ) );
         Long finishTime = ListUtils.firstLong( params.get( "end_time" ) );
@@ -313,7 +389,15 @@ public class Query {
             q.setStartResult( start );
         }
 
+        if ( cursor != null ) {
+            q = newQueryIfNull( q );
+            q.setCursor( cursor );
+        }
 
+        if ( limit != null ) {
+            q = newQueryIfNull( q );
+            q.setLimit( limit );
+        }
 
         if ( startTime != null ) {
             q = newQueryIfNull( q );
@@ -356,21 +440,22 @@ public class Query {
 
     public static Query searchForProperty( String propertyName, Object propertyValue ) {
         Query q = new Query();
-        q.addEqualityFilter(propertyName, propertyValue);
+        q.addEqualityFilter( propertyName, propertyValue );
         return q;
     }
 
 
     public static Query findForProperty( String propertyName, Object propertyValue ) {
         Query q = new Query();
-        q.addEqualityFilter(propertyName, propertyValue);
+        q.addEqualityFilter( propertyName, propertyValue );
+        q.setLimit( 1 );
         return q;
     }
 
 
     public static Query fromUUID( UUID uuid ) {
         Query q = new Query();
-        q.addIdentifier( Identifier.fromUUID(uuid) );
+        q.addIdentifier( Identifier.fromUUID( uuid ) );
         return q;
     }
 
@@ -629,8 +714,8 @@ public class Query {
         for ( SortPredicate s : sortPredicates ) {
             if ( s.getPropertyName().equals( propertyName ) ) {
                 logger.error(
-                        "Attempted to set sort order for " + s.getPropertyName()
-                                + " more than once, discarding..." );
+                    "Attempted to set sort order for " + s.getPropertyName()
+                        + " more than once, discarding..." );
                 return this;
             }
         }
@@ -727,8 +812,8 @@ public class Query {
     public Query addContainsFilter( String propName, String keyword ) {
         ContainsOperand equality = new ContainsOperand( new ClassicToken( 0, "contains" ) );
 
-        equality.setProperty(propName);
-        equality.setLiteral(keyword);
+        equality.setProperty( propName );
+        equality.setLiteral( keyword );
 
         addClause( equality );
 
@@ -737,8 +822,8 @@ public class Query {
 
 
     private void addClause( EqualityOperand equals, String propertyName, Object value ) {
-        equals.setProperty(propertyName);
-        equals.setLiteral(value);
+        equals.setProperty( propertyName );
+        equals.setLiteral( value );
         addClause( equals );
     }
 
@@ -800,7 +885,7 @@ public class Query {
     }
 
 
-    public UUID getStartResult(String cursor) {
+    public UUID getStartResult() {
         if ( ( startResult == null ) && ( cursor != null ) ) {
             byte[] cursorBytes = Base64.decodeBase64( cursor );
             if ( ( cursorBytes != null ) && ( cursorBytes.length == 16 ) ) {
@@ -811,6 +896,61 @@ public class Query {
     }
 
 
+    public String getCursor() {
+        return cursor;
+    }
+
+
+    public void setCursor( String cursor ) {
+        this.cursor = cursor;
+    }
+
+
+    public Query withCursor( String cursor ) {
+        setCursor( cursor );
+        return this;
+    }
+
+
+    public int getLimit() {
+        return getLimit( DEFAULT_LIMIT );
+    }
+
+
+    public int getLimit( int defaultLimit ) {
+        if ( limit <= 0 ) {
+            if ( defaultLimit > 0 ) {
+                return defaultLimit;
+            }
+            else {
+                return DEFAULT_LIMIT;
+            }
+        }
+        return limit;
+    }
+
+
+    public void setLimit( int limit ) {
+
+        // TODO tnine.  After users have had time to change their query limits,
+        // this needs to be uncommented and enforced.
+        //    if(limit > MAX_LIMIT){
+        //        throw new IllegalArgumentException(
+        //            String.format("Query limit must be <= to %d", MAX_LIMIT));
+        //    }
+
+        if ( limit > MAX_LIMIT ) {
+            limit = MAX_LIMIT;
+        }
+
+        this.limit = limit;
+    }
+
+
+    public Query withLimit( int limit ) {
+        setLimit( limit );
+        return this;
+    }
 
 
     public boolean isReversed() {
@@ -998,7 +1138,7 @@ public class Query {
 
 
         public SortPredicate(@JsonProperty("propertyName")  String propertyName,
-                @JsonProperty("direction")  Query.SortDirection direction ) {
+                             @JsonProperty("direction")  Query.SortDirection direction ) {
 
             if ( propertyName == null ) {
                 throw new NullPointerException( "Property name was null" );


[04/10] incubator-usergrid git commit: Merge branch 'two-dot-o' into two-dot-o-dev

Posted by sf...@apache.org.
Merge branch 'two-dot-o' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6db7ce93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6db7ce93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6db7ce93

Branch: refs/heads/USERGRID-501
Commit: 6db7ce937b008b26f3f06e9a6fbe295b4b8d7dc3
Parents: 66fdc61 beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:31:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:31:31 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 304 ++++++-------------
 .../results/CollectionRefsVerifier.java         |  44 +++
 .../CollectionResultsLoaderFactoryImpl.java     |  60 ++++
 .../results/ConnectionRefsVerifier.java         |  61 ++++
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 ++++
 .../results/ElasticSearchQueryExecutor.java     | 212 +++++++++++++
 .../corepersistence/results/QueryExecutor.java  |  37 +++
 .../corepersistence/results/RefsVerifier.java   |  42 ---
 .../results/ResultsLoaderFactory.java           |   3 +-
 .../results/ResultsLoaderFactoryImpl.java       |  62 ----
 .../persistence/MultiQueryIterator.java         |   6 +-
 .../apache/usergrid/persistence/Results.java    |  26 +-
 .../cassandra/QueryProcessorImpl.java           |  12 +-
 .../usergrid/persistence/PathQueryIT.java       |  46 ++-
 stack/core/src/test/resources/log4j.properties  |   2 +
 .../persistence/graph/GraphManagerIT.java       |   2 +-
 .../usergrid/persistence/index/IndexFig.java    |  16 +-
 .../index/impl/BufferQueueInMemoryImpl.java     |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  61 ++--
 .../usergrid/persistence/index/query/Query.java |  58 ++--
 .../persistence/index/utils/ListUtils.java      |   3 +-
 .../cassandra/ManagementServiceImpl.java        |  18 +-
 .../providers/PingIdentityProvider.java         |   5 +-
 23 files changed, 731 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index f505fa3,da39ea9..5416f19
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@@ -76,9 -76,11 +78,9 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 -import org.apache.usergrid.persistence.index.EntityIndex;
++import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 -import org.apache.usergrid.persistence.index.IndexScope;
 -import org.apache.usergrid.persistence.index.SearchTypes;
  import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
- import org.apache.usergrid.persistence.index.query.CandidateResult;
- import org.apache.usergrid.persistence.index.query.CandidateResults;
  import org.apache.usergrid.persistence.index.query.Identifier;
  import org.apache.usergrid.persistence.index.query.Query;
  import org.apache.usergrid.persistence.index.query.Query.Level;
@@@ -275,9 -275,9 +275,9 @@@ public class CpRelationManager implemen
          });
  
          Observable<String> types= gm.getEdgeTypesFromSource(
-             new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
+             new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
  
 -        Iterator<String> iter = types.toBlockingObservable().getIterator();
 +        Iterator<String> iter = types.toBlocking().getIterator();
          while ( iter.hasNext() ) {
              indexes.add( iter.next() );
          }
@@@ -317,59 -318,51 +318,47 @@@
       * @param edgeType Edge type, edge type prefix or null to allow any edge type
       * @param fromEntityType Only consider edges from entities of this type
       */
-     Map<EntityRef, Set<String>> getContainers( int limit, String edgeType, String fromEntityType ) {
- 
-         Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
- 
-         GraphManager gm = managerCache.getGraphManager(applicationScope);
- 
-         Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
-             cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
- 
-         logger.debug("getContainers(): "
-                 + "Searched for edges of type {}\n   to target {}:{}\n   in scope {}\n   found: {}",
-             new Object[] {
-                 edgeType,
-                 cpHeadEntity.getId().getType(),
-                 cpHeadEntity.getId().getUuid(),
-                 applicationScope.getApplication(),
-                 edgeTypes.hasNext()
-         });
+     Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
  
-         while ( edgeTypes.hasNext() ) {
 -        Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
--
-             String etype = edgeTypes.next();
+         final GraphManager gm = managerCache.getGraphManager( applicationScope );
  
-             Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-                 cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+         Observable<Edge> edges =
+             gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
+               .flatMap( new Func1<String, Observable<Edge>>() {
+                   @Override
+                   public Observable<Edge> call( final String edgeType ) {
+                       return gm.loadEdgesToTarget(
+                           new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
+                               SearchByEdgeType.Order.DESCENDING, null ) );
+ 
+                   }
+               } );
+ 
+         //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
+         if ( limit > -1 ) {
+             edges = edges.take( limit );
+         }
  
-             Iterator<Edge> iter = edges.toBlocking().getIterator();
-             while ( iter.hasNext() ) {
-                 Edge edge = iter.next();
  
-                 if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
-                     logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
-                     continue;
 -        return edges.collect( results, new Action2<Map<EntityRef, Set<String>>, Edge>() {
 -            @Override
 -            public void call( final Map<EntityRef, Set<String>> entityRefSetMap, final Edge edge ) {
++        return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge) -> {
+                 if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+                     logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
+                     return;
                  }
  
-                 EntityRef eref = new SimpleEntityRef(
-                         edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                 final EntityRef eref =
+                     new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
  
-                 String name = null;
-                 if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+                 String name;
+                 if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
                      name = CpNamingUtils.getConnectionType( edge.getType() );
-                 } else {
+                 }
+                 else {
                      name = CpNamingUtils.getCollectionName( edge.getType() );
                  }
-                 addMapSet( results, eref, name );
+                 addMapSet( entityRefSetMap, eref, name );
              }
- 
-             if ( limit > 0 && results.keySet().size() >= limit ) {
-                 break;
-             }
-         }
- 
-         return results;
 -        } ).toBlocking().last();
++         ).toBlocking().last();
      }
  
  
@@@ -689,18 -679,16 +675,16 @@@
          // create graph edge connection from head entity to member entity
          Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
          GraphManager gm = managerCache.getGraphManager( applicationScope );
 -        gm.writeEdge( edge ).toBlockingObservable().last();
 +        gm.writeEdge( edge ).toBlocking().last();
  
-         logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}",
-             new Object[] {
-                 edgeType,
-                 cpHeadEntity.getId().getType(),
-                 cpHeadEntity.getId().getUuid(),
-                 memberEntity.getId().getType(),
-                 memberEntity.getId().getUuid(),
-                 applicationScope.getApplication().getType(),
+ 
+         if(logger.isDebugEnabled()) {
+             logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] {
+                 edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(),
+                 memberEntity.getId().getUuid(), applicationScope.getApplication().getType(),
                  applicationScope.getApplication().getUuid()
-         } );
+             } );
+         }
  
          ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
index 0000000,535af36..fea21ae
mode 000000,100644..100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
@@@ -1,0 -1,211 +1,212 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.usergrid.corepersistence.results;
+ 
+ 
+ import java.util.Iterator;
+ import java.util.NoSuchElementException;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.usergrid.persistence.Results;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
++import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+ import org.apache.usergrid.persistence.index.EntityIndex;
+ import org.apache.usergrid.persistence.index.IndexScope;
+ import org.apache.usergrid.persistence.index.SearchTypes;
+ import org.apache.usergrid.persistence.index.query.CandidateResults;
+ import org.apache.usergrid.persistence.index.query.Query;
+ 
+ 
+ public class ElasticSearchQueryExecutor implements QueryExecutor {
+ 
+     private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
+ 
+     private final ResultsLoaderFactory resultsLoaderFactory;
+ 
+     private final ApplicationScope applicationScope;
+ 
 -    private final EntityIndex entityIndex;
++    private final ApplicationEntityIndex entityIndex;
+ 
+     private final IndexScope indexScope;
+ 
+     private final SearchTypes types;
+ 
+     private final Query query;
+ 
+ 
+     private Results currentResults;
+ 
+     private boolean moreToLoad = true;
+ 
+ 
 -    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final EntityIndex entityIndex,
++    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
+                                        final ApplicationScope applicationScope, final IndexScope indexScope,
+                                        final SearchTypes types, final Query query ) {
+         this.resultsLoaderFactory = resultsLoaderFactory;
+         this.applicationScope = applicationScope;
+         this.entityIndex = entityIndex;
+         this.indexScope = indexScope;
+         this.types = types;
+ 
+         //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
+         //once we start subscribing to streams.
+         this.query = new Query(query);
+     }
+ 
+ 
+     @Override
+     public Iterator<Results> iterator() {
+         return this;
+     }
+ 
+ 
+     private void loadNextPage() {
+         // Because of possible stale entities, which are filtered out by buildResults(),
+         // we loop until the we've got enough results to satisfy the query limit.
+ 
+         final int maxQueries = 10; // max re-queries to satisfy query limit
+ 
+         final int originalLimit = query.getLimit();
+ 
+         Results results = null;
+         int queryCount = 0;
+ 
+         boolean satisfied = false;
+ 
+ 
+         while ( !satisfied && queryCount++ < maxQueries ) {
+ 
+             CandidateResults crs = entityIndex.search( indexScope, types, query );
+ 
+             logger.debug( "Calling build results 1" );
+             results = buildResults( indexScope, query, crs );
+ 
+             if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
+                 satisfied = true;
+             }
+ 
+             /**
+              * In an edge case where we delete stale entities, we could potentially get less results than expected.
+              * This will only occur once during the repair phase.
+              * We need to ensure that we short circuit before we overflow the requested limit during a repair.
+              */
+             else if ( results.size() > 0 ) { // got what we need
+                 satisfied = true;
+             }
+             //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
+             //circuit to avoid over returning the result set
+             else if ( crs.hasCursor() ) {
+                 satisfied = false;
+ 
+                 // need to query for more
+                 // ask for just what we need to satisfy, don't want to exceed limit
+                 query.setCursor( results.getCursor() );
+                 query.setLimit( originalLimit - results.size() );
+ 
+                 logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+                     originalLimit, query.getLimit(), queryCount
+                 } );
+             }
+         }
+ 
+         //now set our cursor if we have one for the next iteration
+         if ( results.hasCursor() ) {
+             query.setCursor( results.getCursor() );
+             moreToLoad = true;
+         }
+ 
+         else {
+             moreToLoad = false;
+         }
+ 
+ 
+         //set our current results and the flag
+         this.currentResults = results;
+     }
+ 
+ 
+     /**
+      * Build results from a set of candidates, and discard those that represent stale indexes.
+      *
+      * @param query Query that was executed
+      * @param crs Candidates to be considered for results
+      */
+     private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs ) {
+ 
+         logger.debug( "buildResults()  from {} candidates", crs.size() );
+ 
+         //get an instance of our results loader
+         final ResultsLoader resultsLoader =
+             this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() );
+ 
+         //load the results
+         final Results results = resultsLoader.loadResults( crs );
+ 
+         //signal for post processing
+         resultsLoader.postProcess();
+ 
+ 
+         results.setCursor( crs.getCursor() );
+ 
+         //ugly and tight coupling, but we don't have a choice until we finish some refactoring
+         results.setQueryExecutor( this );
+ 
+         logger.debug( "Returning results size {}", results.size() );
+ 
+         return results;
+     }
+ 
+ 
+     @Override
+     public boolean hasNext() {
+ 
+         //we've tried to load and it's empty and we have more to load, load the next page
+         if ( currentResults == null ) {
+             //there's nothing left to load, nothing to do
+             if ( !moreToLoad ) {
+                 return false;
+             }
+ 
+             //load the page
+ 
+             loadNextPage();
+         }
+ 
+ 
+         //see if our current results are not null
+         return currentResults != null;
+     }
+ 
+ 
+     @Override
+     public Results next() {
+         if ( !hasNext() ) {
+             throw new NoSuchElementException( "No more results present" );
+         }
+ 
+         final Results toReturn = currentResults;
+ 
+         currentResults = null;
+ 
+         return toReturn;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
index d588476,9c768bf..26395b4
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
@@@ -25,9 -27,7 +27,8 @@@ import org.apache.commons.lang.math.Num
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import org.apache.commons.lang.math.NumberUtils;
  
 +
  public class ListUtils extends org.apache.commons.collections.ListUtils {
      private static final Logger LOG = LoggerFactory.getLogger( ListUtils.class );
  


[06/10] incubator-usergrid git commit: merge from two-o-dev

Posted by sf...@apache.org.
merge from two-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/208ed2b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/208ed2b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/208ed2b3

Branch: refs/heads/USERGRID-501
Commit: 208ed2b3c93771faf51a41903d617f67cc32b14a
Parents: f695724 6db7ce9
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 26 10:20:30 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 26 10:20:30 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 304 ++++++-------------
 .../results/CollectionRefsVerifier.java         |  44 +++
 .../CollectionResultsLoaderFactoryImpl.java     |  60 ++++
 .../results/ConnectionRefsVerifier.java         |  61 ++++
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 ++++
 .../results/ElasticSearchQueryExecutor.java     | 212 +++++++++++++
 .../corepersistence/results/QueryExecutor.java  |  37 +++
 .../corepersistence/results/RefsVerifier.java   |  42 ---
 .../results/ResultsLoaderFactory.java           |   3 +-
 .../results/ResultsLoaderFactoryImpl.java       |  62 ----
 .../persistence/MultiQueryIterator.java         |   6 +-
 .../apache/usergrid/persistence/Results.java    |  26 +-
 .../cassandra/QueryProcessorImpl.java           |  12 +-
 .../usergrid/persistence/PathQueryIT.java       |  46 ++-
 stack/core/src/test/resources/log4j.properties  |   2 +
 .../persistence/graph/GraphManagerIT.java       |   2 +-
 .../usergrid/persistence/index/IndexFig.java    |  16 +-
 .../index/impl/BufferQueueInMemoryImpl.java     |  10 +-
 .../index/impl/EsEntityIndexBatchImpl.java      |   2 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  61 ++--
 .../usergrid/persistence/index/query/Query.java |  55 ++--
 .../persistence/index/utils/ListUtils.java      |   3 +-
 .../cassandra/ManagementServiceImpl.java        |  18 +-
 .../providers/PingIdentityProvider.java         |   5 +-
 24 files changed, 730 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208ed2b3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 8f8ac5e,90b8e35..f62648d
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@@ -40,7 -65,7 +40,7 @@@ public class EsEntityIndexBatchImpl imp
  
      private final ApplicationScope applicationScope;
  
--    private final IndexIdentifier.IndexAlias alias;
++    private final IndexAlias alias;
      private final IndexIdentifier indexIdentifier;
  
      private final IndexBufferProducer indexBatchBufferProducer;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/208ed2b3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index 6454b48,6362c0a..fa7def4
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@@ -106,31 -117,86 +106,39 @@@ public class Query 
      }
  
  
+     /**
+      * Creates a deep copy of a query from another query
+      * @param q
+      */
      public Query( Query q ) {
-         if ( q != null ) {
-             type = q.type;
-             sortPredicates = q.sortPredicates != null
-                     ? new ArrayList<SortPredicate>( q.sortPredicates ) : null;
-             startResult = q.startResult;
-             selectAssignments = q.selectAssignments != null
-                     ? new LinkedHashMap<String, String>( q.selectAssignments ) : null;
-             mergeSelectResults = q.mergeSelectResults;
-             //level = q.level;
-             connectionType = q.connectionType;
-             permissions = q.permissions != null ? new ArrayList<String>( q.permissions ) : null;
-             reversed = q.reversed;
-             reversedSet = q.reversedSet;
-             startTime = q.startTime;
-             finishTime = q.finishTime;
-             resolution = q.resolution;
-             pad = q.pad;
-             rootOperand = q.rootOperand;
-             identifiers = q.identifiers != null
-                     ? new ArrayList<Identifier>( q.identifiers ) : null;
-             counterFilters = q.counterFilters != null
-                     ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
-             collection = q.collection;
 -        if ( q == null ) {
++        if (q == null) {
+             return;
          }
 -
+         type = q.type;
+         sortPredicates = q.sortPredicates != null
 -                ? new ArrayList<>( q.sortPredicates ) : null;
++                ? new ArrayList<SortPredicate>(q.sortPredicates) : null;
+         startResult = q.startResult;
 -        cursor = q.cursor;
 -        limit = q.limit;
+         selectAssignments = q.selectAssignments != null
 -                ? new LinkedHashMap<>( q.selectAssignments ) : null;
++                ? new LinkedHashMap<String, String>(q.selectAssignments) : null;
+         mergeSelectResults = q.mergeSelectResults;
+         //level = q.level;
+         connectionType = q.connectionType;
 -        permissions = q.permissions != null ? new ArrayList<>( q.permissions ) : null;
++        permissions = q.permissions != null ? new ArrayList<String>(q.permissions) : null;
+         reversed = q.reversed;
+         reversedSet = q.reversedSet;
+         startTime = q.startTime;
+         finishTime = q.finishTime;
+         resolution = q.resolution;
+         pad = q.pad;
+         rootOperand = q.rootOperand;
+         identifiers = q.identifiers != null
 -                ? new ArrayList<>( q.identifiers ) : null;
++                ? new ArrayList<Identifier>(q.identifiers) : null;
+         counterFilters = q.counterFilters != null
 -                ? new ArrayList<>( q.counterFilters ) : null;
++                ? new ArrayList<CounterFilterPredicate>(q.counterFilters) : null;
+         collection = q.collection;
 -        level = q.level;
 -
 -    }
 -
 -
 -    public QueryBuilder createQueryBuilder( final String context ) {
 -
 -
 -        QueryBuilder queryBuilder = null;
 -
 -
 -        //we have a root operand.  Translate our AST into an ES search
 -        if ( getRootOperand() != null ) {
 -            // In the case of geo only queries, this will return null into the query builder.
 -            // Once we start using tiles, we won't need this check any longer, since a geo query
 -            // will return a tile query + post filter
 -            QueryVisitor v = new EsQueryVistor();
 -
 -            try {
 -                getRootOperand().visit( v );
 -            }
 -            catch ( IndexException ex ) {
 -                throw new RuntimeException( "Error building ElasticSearch query", ex );
 -            }
 -
 -
 -            queryBuilder = v.getQueryBuilder();
 -        }
 -
 -
 -         // Add our filter for context to our query for fast execution.
 -         // Fast because it utilizes bitsets internally. See this post for more detail.
 -         // http://www.elasticsearch.org/blog/all-about-elasticsearch-filter-bitsets/
 -
 -        // TODO evaluate performance when it's an all query.
 -        // Do we need to put the context term first for performance?
 -        if ( queryBuilder != null ) {
 -            queryBuilder = QueryBuilders.boolQuery().must( queryBuilder ).must( QueryBuilders
 -                    .termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context ) );
 -        }
+ 
 -        //nothing was specified ensure we specify the context in the search
 -        else {
 -            queryBuilder = QueryBuilders.termQuery( IndexingUtils.ENTITY_CONTEXT_FIELDNAME, context );
 -        }
++        level = q.level;
+ 
 -        return queryBuilder;
      }
  
  
@@@ -235,7 -317,8 +243,6 @@@
          Boolean reversed = ListUtils.firstBoolean( params.get( "reversed" ) );
          String connection = ListUtils.first( params.get( "connectionType" ) );
          UUID start = ListUtils.firstUuid( params.get( "start" ) );
--        String cursor = ListUtils.first( params.get( "cursor" ) );
 -        Integer limit = ListUtils.firstInteger( params.get( "limit" ) );
          List<String> permissions = params.get( "permission" );
          Long startTime = ListUtils.firstLong( params.get( "start_time" ) );
          Long finishTime = ListUtils.firstLong( params.get( "end_time" ) );


[05/10] incubator-usergrid git commit: add data migration

Posted by sf...@apache.org.
add data migration


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/f695724b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/f695724b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/f695724b

Branch: refs/heads/USERGRID-501
Commit: f695724b0417131eb9592a642ae8fa7a29afd8cd
Parents: 8a62d11
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 26 10:14:12 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 26 10:14:12 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexAlias.java  |  38 ++++++
 .../persistence/index/IndexIdentifier.java      |  18 ---
 .../persistence/index/guice/IndexModule.java    |  19 +++
 .../impl/EsApplicationEntityIndexImpl.java      |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |   2 +-
 .../persistence/index/impl/EsIndexCache.java    |   7 +-
 .../impl/SearchRequestBuilderStrategy.java      |   5 +-
 .../migration/EsIndexDataMigrationImpl.java     | 134 +++++++++++++++++++
 .../index/migration/EsIndexMigrationPlugin.java |  46 +++++++
 .../index/migration/IndexMigration.java         |  35 +++++
 10 files changed, 281 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
new file mode 100644
index 0000000..a04f80e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+/**
+ * Abstraction for Index alias names
+ */
+public class IndexAlias{
+    private final String readAlias;
+    private final String writeAlias;
+
+    public IndexAlias(IndexFig indexFig,String indexBase) {
+        this.writeAlias = indexBase + "_write_" + indexFig.getAliasPostfix();
+        this.readAlias = indexBase + "_read_" + indexFig.getAliasPostfix();
+    }
+
+    public String getReadAlias() {
+        return readAlias;
+    }
+
+    public String getWriteAlias() {
+        return writeAlias;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
index c659ed6..48c48f3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
@@ -55,24 +55,6 @@ public class IndexIdentifier{
     }
 
 
-    public class IndexAlias{
-        private final String readAlias;
-        private final String writeAlias;
-
-        public IndexAlias(IndexFig indexFig,String indexBase) {
-            this.writeAlias = indexBase + "_write_" + indexFig.getAliasPostfix();
-            this.readAlias = indexBase + "_read_" + indexFig.getAliasPostfix();
-        }
-
-        public String getReadAlias() {
-            return readAlias;
-        }
-
-        public String getWriteAlias() {
-            return writeAlias;
-        }
-    }
-
     public String toString() {
         return "index id"+config.getIndexPrefix();
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index c9125c5..a42dea8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -19,6 +19,11 @@
 
 package org.apache.usergrid.persistence.index.guice;
 
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.Multibinder;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.*;
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -28,6 +33,9 @@ import org.apache.usergrid.persistence.index.impl.EsEntityIndexFactoryImpl;
 import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferConsumerImpl;
 import org.apache.usergrid.persistence.index.impl.EsIndexBufferProducerImpl;
+import org.apache.usergrid.persistence.index.migration.EsIndexDataMigrationImpl;
+import org.apache.usergrid.persistence.index.migration.EsIndexMigrationPlugin;
+import org.apache.usergrid.persistence.index.migration.IndexMigration;
 import org.apache.usergrid.persistence.map.guice.MapModule;
 import org.apache.usergrid.persistence.queue.guice.QueueModule;
 
@@ -56,6 +64,17 @@ public class IndexModule extends AbstractModule {
 
 
         bind( BufferQueue.class).toProvider( QueueProvider.class );
+
+        //wire up the edg migration
+        Multibinder<DataMigration<ApplicationScope>> dataMigrationMultibinder =
+                Multibinder.newSetBinder( binder(), new TypeLiteral<DataMigration<ApplicationScope>>() {}, IndexMigration.class );
+
+
+        dataMigrationMultibinder.addBinding().to(EsIndexDataMigrationImpl.class);
+
+
+        //wire up the collection migration plugin
+        Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to(EsIndexMigrationPlugin.class);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index ca071e2..27791ae 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -76,7 +76,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
     private final EsIndexCache indexCache;
     private final IndexFig indexFig;
     private final EsProvider esProvider;
-    private final IndexIdentifier.IndexAlias alias;
+    private final IndexAlias alias;
     private final Timer deleteApplicationTimer;
     private final Meter deleteApplicationMeter;
     private final SearchRequestBuilderStrategy searchRequest;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 259fa55..cab8ded 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -77,7 +77,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
 
     public static final String DEFAULT_TYPE = "_default_";
 
-    private final IndexIdentifier.IndexAlias alias;
+    private final IndexAlias alias;
     private final IndexBufferProducer indexBatchBufferProducer;
     private final IndexFig indexFig;
     private final Timer addTimer;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
index ef518dd..9ba89ca 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexCache.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.usergrid.persistence.index.IndexAlias;
 import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
 import org.elasticsearch.client.AdminClient;
 import org.elasticsearch.cluster.metadata.AliasMetaData;
@@ -98,7 +99,7 @@ public class EsIndexCache {
     /**
      * Get indexes for an alias
      */
-    public String[] getIndexes( IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
+    public String[] getIndexes( IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
         String[] indexes;
         try {
             indexes = aliasIndexCache.get( getAliasName( alias, aliasType ) );
@@ -127,7 +128,7 @@ public class EsIndexCache {
      * @param aliasType
      * @return
      */
-    private String getAliasName( IndexIdentifier.IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
+    private String getAliasName( IndexAlias alias, AliasedEntityIndex.AliasType aliasType ) {
         return aliasType == AliasedEntityIndex.AliasType.Read ? alias.getReadAlias() : alias.getWriteAlias();
     }
 
@@ -135,7 +136,7 @@ public class EsIndexCache {
     /**
      * clean up cache
      */
-    public void invalidate( IndexIdentifier.IndexAlias alias ) {
+    public void invalidate( IndexAlias alias ) {
         aliasIndexCache.invalidate( alias.getWriteAlias() );
         aliasIndexCache.invalidate( alias.getReadAlias() );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
index d146e4c..15796f9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/SearchRequestBuilderStrategy.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.index.impl;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.index.IndexAlias;
 import org.apache.usergrid.persistence.index.IndexIdentifier;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
@@ -53,11 +54,11 @@ public class SearchRequestBuilderStrategy {
 
     private final EsProvider esProvider;
     private final ApplicationScope applicationScope;
-    private final IndexIdentifier.IndexAlias alias;
+    private final IndexAlias alias;
     private final int cursorTimeout;
     public static final int MAX_LIMIT = 1000;
 
-    public SearchRequestBuilderStrategy(final EsProvider esProvider, final ApplicationScope applicationScope, final IndexIdentifier.IndexAlias alias, int cursorTimeout){
+    public SearchRequestBuilderStrategy(final EsProvider esProvider, final ApplicationScope applicationScope, final IndexAlias alias, int cursorTimeout){
 
         this.esProvider = esProvider;
         this.applicationScope = applicationScope;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
new file mode 100644
index 0000000..b5dab53
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.migration;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.migration.data.ProgressObserver;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.IndexAlias;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexIdentifier;
+import org.apache.usergrid.persistence.index.impl.EsIndexCache;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
+import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
+import org.elasticsearch.client.AdminClient;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Classy class class.
+ */
+public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope> {
+
+    private final AliasedEntityIndex entityIndex;
+    private final EsProvider provider;
+    private final IndexFig indexFig;
+    private final IndexIdentifier indexIdentifier;
+    private final EsIndexCache indexCache;
+
+    @Inject
+    public EsIndexDataMigrationImpl(AliasedEntityIndex entityIndex, EsProvider provider, IndexFig indexFig, IndexIdentifier indexIdentifier, EsIndexCache indexCache){
+        this.entityIndex = entityIndex;
+        this.provider = provider;
+        this.indexFig = indexFig;
+        this.indexIdentifier = indexIdentifier;
+        this.indexCache = indexCache;
+    }
+
+    @Override
+    public int migrate(int currentVersion, MigrationDataProvider<ApplicationScope> migrationDataProvider, ProgressObserver observer) {
+        migrationDataProvider.getData().doOnNext(applicationScope -> {
+            LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig,applicationScope);
+            String[] indexes = indexCache.getIndexes(legacyIndexIdentifier.getAlias(), AliasedEntityIndex.AliasType.Read);
+            AdminClient adminClient = provider.getClient().admin();
+
+            for (String index : indexes) {
+                IndicesAliasesRequestBuilder aliasesRequestBuilder = adminClient.indices().prepareAliases();
+                aliasesRequestBuilder = adminClient.indices().prepareAliases();
+                // add read alias
+                aliasesRequestBuilder.addAlias(index, indexIdentifier.getAlias().getReadAlias());
+            }
+        });
+        return 0;
+    }
+
+    @Override
+    public boolean supports(int currentVersion) {
+        return false;
+    }
+
+    @Override
+    public int getMaxVersion() {
+        return 0;
+    }
+    /**
+     * Class is used to generate an index name and alias name the old way via app name
+     */
+    public class LegacyIndexIdentifier{
+        private final IndexFig config;
+        private final ApplicationScope applicationScope;
+
+        public LegacyIndexIdentifier(IndexFig config, ApplicationScope applicationScope) {
+            this.config = config;
+            this.applicationScope = applicationScope;
+        }
+
+        /**
+         * Get the alias name
+         * @return
+         */
+        public IndexAlias getAlias() {
+            return new IndexAlias(config,getIndexBase());
+        }
+
+        /**
+         * Get index name, send in additional parameter to add incremental indexes
+         * @param suffix
+         * @return
+         */
+        public String getIndex(String suffix) {
+            if (suffix != null) {
+                return getIndexBase() + "_" + suffix;
+            } else {
+                return getIndexBase();
+            }
+        }
+
+        /**
+         * returns the base name for index which will be used to add an alias and index
+         * @return
+         */
+        private String getIndexBase() {
+            StringBuilder sb = new StringBuilder();
+            sb.append(config.getIndexPrefix()).append(IndexingUtils.SEPARATOR);
+            IndexingUtils.idString(sb, applicationScope.getApplication());
+            return sb.toString();
+        }
+
+
+
+        public String toString() {
+            return "application: " + applicationScope.getApplication().getUuid();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMigrationPlugin.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMigrationPlugin.java
new file mode 100644
index 0000000..a28c701
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexMigrationPlugin.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.migration;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.core.migration.data.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import java.util.Set;
+
+/**
+ * Classy class class.
+ */
+public class EsIndexMigrationPlugin  extends AbstractMigrationPlugin<ApplicationScope>{
+
+    @Inject
+    public EsIndexMigrationPlugin(@IndexMigration final Set<DataMigration<ApplicationScope>> entityDataMigrations,
+                                  final MigrationDataProvider<ApplicationScope> entityIdScopeDataMigrationProvider,
+                                  final MigrationInfoSerialization migrationInfoSerialization ){
+        super(entityDataMigrations,entityIdScopeDataMigrationProvider,migrationInfoSerialization);
+    }
+
+    @Override
+    public String getName() {
+        return "index-migration";
+    }
+
+    @Override
+    public PluginPhase getPhase() {
+        return PluginPhase.MIGRATE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/f695724b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexMigration.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexMigration.java
new file mode 100644
index 0000000..c398aac
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexMigration.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index.migration;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Index migration annotation
+ */
+
+@BindingAnnotation
+@Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface IndexMigration {}


[07/10] incubator-usergrid git commit: add version migration for indexes

Posted by sf...@apache.org.
add version migration for indexes


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bf0718e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bf0718e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bf0718e6

Branch: refs/heads/USERGRID-501
Commit: bf0718e611bb06d8bf901ba34f5d8172d5ef02bb
Parents: 208ed2b
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 26 10:53:20 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 26 10:53:20 2015 -0600

----------------------------------------------------------------------
 .../AllApplicationsObservable.java              |  3 +-
 .../usergrid/corepersistence/CoreModule.java    |  9 ++++-
 .../rx/impl/AllApplicationsObservableImpl.java  |  5 ++-
 .../persistence/index/guice/IndexModule.java    | 13 ++++++-
 .../index/impl/EsEntityIndexImpl.java           |  9 ++++-
 .../migration/EsIndexDataMigrationImpl.java     | 20 ++++++++--
 .../index/migration/IndexDataVersions.java      | 39 ++++++++++++++++++++
 .../index/guice/TestIndexModule.java            |  7 +++-
 8 files changed, 94 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
index 1187cf2..055223e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/AllApplicationsObservable.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Application;
 
@@ -29,7 +30,7 @@ import rx.Observable;
 /**
  * Interface for generating an observable of all ApplicationScope
  */
-public interface AllApplicationsObservable {
+public interface AllApplicationsObservable extends MigrationDataProvider<ApplicationScope>{
 
     /**
      * Return all applications in our system

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index 2e9b780..d994419 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -16,6 +16,7 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.springframework.context.ApplicationContext;
 
 import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
@@ -96,7 +97,13 @@ public class CoreModule  extends AbstractModule {
                     AllNodesInGraphImpl.class );
             }
         } );
-        install(new IndexModule());
+        install(new IndexModule(){
+            @Override
+            public void configureMigrationProvider() {
+                bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ).to(
+                    AllApplicationsObservable.class );
+            }
+        });
        //        install(new MapModule());   TODO, re-enable when index module doesn't depend on queue
        //        install(new QueueModule());
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
index 0fc5452..aa5b5dc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllApplicationsObservableImpl.java
@@ -138,5 +138,8 @@ public class AllApplicationsObservableImpl implements AllApplicationsObservable
     }
 
 
-
+    @Override
+    public Observable<ApplicationScope> getData() {
+        return getAllApplications();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index a42dea8..7ecce54 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -42,7 +42,7 @@ import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 
-public class IndexModule extends AbstractModule {
+public abstract class IndexModule extends AbstractModule {
 
     @Override
     protected void configure() {
@@ -75,8 +75,17 @@ public class IndexModule extends AbstractModule {
 
         //wire up the collection migration plugin
         Multibinder.newSetBinder( binder(), MigrationPlugin.class ).addBinding().to(EsIndexMigrationPlugin.class);
-    }
 
 
+        //invoke the migration plugin config
+        configureMigrationProvider();
+    }
+
+    /**
+     * Gives callers the ability to to configure an instance of
+     *
+     * MigrationDataProvider<ApplicationScope> for providing data migrations
+     */
+    public abstract void configureMigrationProvider();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index cab8ded..844260f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -27,10 +27,12 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.migration.data.VersionedData;
 import org.apache.usergrid.persistence.core.util.Health;
 import org.apache.usergrid.persistence.index.*;
 import org.apache.usergrid.persistence.index.exceptions.IndexException;
 
+import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
 import org.elasticsearch.action.ActionFuture;
@@ -71,7 +73,7 @@ import java.util.*;
  * Implements index using ElasticSearch Java API.
  */
 @Singleton
-public class EsEntityIndexImpl implements AliasedEntityIndex {
+public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
 
     private static final Logger logger = LoggerFactory.getLogger( EsEntityIndexImpl.class );
 
@@ -442,6 +444,11 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
         return Health.RED;
     }
 
+    @Override
+    public int getImplementationVersion() {
+        return IndexDataVersions.SINGLE_INDEX.getVersion();
+    }
+
 
     /**
      * Interface for operations.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
index b5dab53..2a0bb15 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
@@ -20,6 +20,7 @@ import com.google.inject.Inject;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.migration.data.ProgressObserver;
+import org.apache.usergrid.persistence.core.migration.data.VersionedData;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.AliasedEntityIndex;
 import org.apache.usergrid.persistence.index.IndexAlias;
@@ -30,9 +31,12 @@ import org.apache.usergrid.persistence.index.impl.EsProvider;
 import org.apache.usergrid.persistence.index.impl.IndexingUtils;
 import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
 import org.elasticsearch.client.AdminClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Classy class class.
@@ -44,6 +48,8 @@ public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope>
     private final IndexFig indexFig;
     private final IndexIdentifier indexIdentifier;
     private final EsIndexCache indexCache;
+    private final VersionedData dataVersion;
+    private static final Logger log = LoggerFactory.getLogger(EsIndexDataMigrationImpl.class);
 
     @Inject
     public EsIndexDataMigrationImpl(AliasedEntityIndex entityIndex, EsProvider provider, IndexFig indexFig, IndexIdentifier indexIdentifier, EsIndexCache indexCache){
@@ -52,10 +58,12 @@ public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope>
         this.indexFig = indexFig;
         this.indexIdentifier = indexIdentifier;
         this.indexCache = indexCache;
+        this.dataVersion = (VersionedData) entityIndex;
     }
 
     @Override
     public int migrate(int currentVersion, MigrationDataProvider<ApplicationScope> migrationDataProvider, ProgressObserver observer) {
+        final AtomicInteger integer = new AtomicInteger();
         migrationDataProvider.getData().doOnNext(applicationScope -> {
             LegacyIndexIdentifier legacyIndexIdentifier = new LegacyIndexIdentifier(indexFig,applicationScope);
             String[] indexes = indexCache.getIndexes(legacyIndexIdentifier.getAlias(), AliasedEntityIndex.AliasType.Read);
@@ -66,19 +74,23 @@ public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope>
                 aliasesRequestBuilder = adminClient.indices().prepareAliases();
                 // add read alias
                 aliasesRequestBuilder.addAlias(index, indexIdentifier.getAlias().getReadAlias());
+                integer.incrementAndGet();
             }
-        });
-        return 0;
+        })
+        .doOnError(error -> log.error("failed to migrate index",error))
+        .toBlocking().last();
+
+        return integer.get();
     }
 
     @Override
     public boolean supports(int currentVersion) {
-        return false;
+        return currentVersion < dataVersion.getImplementationVersion();
     }
 
     @Override
     public int getMaxVersion() {
-        return 0;
+        return dataVersion.getImplementationVersion();
     }
     /**
      * Class is used to generate an index name and alias name the old way via app name

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
new file mode 100644
index 0000000..921d86a
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/IndexDataVersions.java
@@ -0,0 +1,39 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.index.migration;
+
+/**
+ * Classy class class.
+ */
+public enum IndexDataVersions {
+    MANY_INDEXES(0),
+    SINGLE_INDEX(1);
+
+    private final int version;
+
+
+    private IndexDataVersions( final int version ) {this.version = version;}
+
+
+    public int getVersion() {
+        return version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bf0718e6/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 4cf46d6..4e6700c 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -33,7 +33,12 @@ public class TestIndexModule extends TestModule {
         install( new CommonModule());
 
         // configure collections and our core astyanax framework
-        install( new IndexModule()  );
+        install( new IndexModule(){
+            @Override
+            public  void configureMigrationProvider(){
+
+            }
+        });
         install( new GuicyFigModule(IndexTestFig.class) );
     }
 }


[02/10] incubator-usergrid git commit: Refactored search to encapsulate repair and load logic

Posted by sf...@apache.org.
Refactored search to encapsulate repair and load logic

Loaders and verifiers are separate for connections and collections

Fixes broken test in GraphManager if writes occur in the same millisecond

Reduces fork count to 1 and uses parallel threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c7fa864f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c7fa864f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c7fa864f

Branch: refs/heads/USERGRID-501
Commit: c7fa864f73e742c8dd37071ea57d5b4e2ec31422
Parents: 5937f9f
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 25 12:38:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 25 16:45:31 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 316 ++++++-------------
 .../results/CollectionRefsVerifier.java         |  44 +++
 .../CollectionResultsLoaderFactoryImpl.java     |  60 ++++
 .../results/ConnectionRefsVerifier.java         |  61 ++++
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 ++++
 .../results/ElasticSearchQueryExecutor.java     | 211 +++++++++++++
 .../corepersistence/results/QueryExecutor.java  |  37 +++
 .../corepersistence/results/RefsVerifier.java   |  42 ---
 .../results/ResultsLoaderFactory.java           |   3 +-
 .../results/ResultsLoaderFactoryImpl.java       |  62 ----
 .../persistence/MultiQueryIterator.java         |   6 +-
 .../apache/usergrid/persistence/Results.java    |  26 +-
 .../cassandra/QueryProcessorImpl.java           |  12 +-
 .../usergrid/persistence/PathQueryIT.java       |  46 ++-
 stack/core/src/test/resources/log4j.properties  |   2 +
 .../persistence/graph/GraphManagerIT.java       |   9 +-
 .../usergrid/persistence/index/IndexFig.java    |  16 +-
 .../index/impl/BufferQueueInMemoryImpl.java     |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  61 ++--
 .../usergrid/persistence/index/query/Query.java |  58 ++--
 .../persistence/index/utils/ListUtils.java      |   2 -
 stack/pom.xml                                   |   2 +-
 22 files changed, 729 insertions(+), 422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 2eeee28..7179baf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -29,14 +29,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
 
-import org.apache.usergrid.corepersistence.results.ResultsLoader;
-import org.apache.usergrid.corepersistence.results.ResultsLoaderFactory;
-import org.apache.usergrid.corepersistence.results.ResultsLoaderFactoryImpl;
+import org.apache.usergrid.corepersistence.results.ConnectionResultsLoaderFactoryImpl;
+import org.apache.usergrid.corepersistence.results.ElasticSearchQueryExecutor;
+import org.apache.usergrid.corepersistence.results.QueryExecutor;
+import org.apache.usergrid.corepersistence.results.CollectionResultsLoaderFactoryImpl;
 import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.ConnectedEntityRef;
@@ -61,6 +61,7 @@ import org.apache.usergrid.persistence.cassandra.index.IndexBucketScanner;
 import org.apache.usergrid.persistence.cassandra.index.IndexScanner;
 import org.apache.usergrid.persistence.cassandra.index.NoOpIndexScanner;
 import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.entities.Group;
@@ -80,8 +81,6 @@ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 import org.apache.usergrid.persistence.index.IndexScope;
 import org.apache.usergrid.persistence.index.SearchTypes;
 import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
-import org.apache.usergrid.persistence.index.query.CandidateResult;
-import org.apache.usergrid.persistence.index.query.CandidateResults;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -115,6 +114,7 @@ import me.prettyprint.hector.api.beans.HColumn;
 import me.prettyprint.hector.api.mutation.Mutator;
 import rx.Observable;
 import rx.functions.Action1;
+import rx.functions.Action2;
 import rx.functions.Func1;
 
 import static java.util.Arrays.asList;
@@ -186,8 +186,6 @@ public class CpRelationManager implements RelationManager {
 
     private IndexBucketLocator indexBucketLocator;
 
-    private ResultsLoaderFactory resultsLoaderFactory;
-
     private MetricsFactory metricsFactory;
     private Timer updateCollectionTimer;
     private Timer createConnectionTimer;
@@ -258,7 +256,6 @@ public class CpRelationManager implements RelationManager {
         // commented out because it is possible that CP entity has not been created yet
         Assert.notNull( cpHeadEntity, "cpHeadEntity cannot be null" );
 
-        this.resultsLoaderFactory = new ResultsLoaderFactoryImpl( managerCache );
 
         return this;
     }
@@ -278,7 +275,7 @@ public class CpRelationManager implements RelationManager {
         });
 
         Observable<String> types= gm.getEdgeTypesFromSource(
-            new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
+            new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
 
         Iterator<String> iter = types.toBlockingObservable().getIterator();
         while ( iter.hasNext() ) {
@@ -316,63 +313,56 @@ public class CpRelationManager implements RelationManager {
 
     /**
      * Gets containing collections and/or connections depending on the edge type you pass in
+     *
      * @param limit Max number to return
      * @param edgeType Edge type, edge type prefix or null to allow any edge type
      * @param fromEntityType Only consider edges from entities of this type
      */
-    Map<EntityRef, Set<String>> getContainers( int limit, String edgeType, String fromEntityType ) {
+    Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
 
         Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
 
-        GraphManager gm = managerCache.getGraphManager(applicationScope);
-
-        Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
-            cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
-
-        logger.debug("getContainers(): "
-                + "Searched for edges of type {}\n   to target {}:{}\n   in scope {}\n   found: {}",
-            new Object[] {
-                edgeType,
-                cpHeadEntity.getId().getType(),
-                cpHeadEntity.getId().getUuid(),
-                applicationScope.getApplication(),
-                edgeTypes.hasNext()
-        });
+        final GraphManager gm = managerCache.getGraphManager( applicationScope );
 
-        while ( edgeTypes.hasNext() ) {
+        Observable<Edge> edges =
+            gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
+              .flatMap( new Func1<String, Observable<Edge>>() {
+                  @Override
+                  public Observable<Edge> call( final String edgeType ) {
+                      return gm.loadEdgesToTarget(
+                          new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
+                              SearchByEdgeType.Order.DESCENDING, null ) );
 
-            String etype = edgeTypes.next();
+                  }
+              } );
 
-            Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-                cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+        //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
+        if ( limit > -1 ) {
+            edges = edges.take( limit );
+        }
 
-            Iterator<Edge> iter = edges.toBlockingObservable().getIterator();
-            while ( iter.hasNext() ) {
-                Edge edge = iter.next();
 
-                if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
-                    logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
-                    continue;
+        return edges.collect( results, new Action2<Map<EntityRef, Set<String>>, Edge>() {
+            @Override
+            public void call( final Map<EntityRef, Set<String>> entityRefSetMap, final Edge edge ) {
+                if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+                    logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
+                    return;
                 }
 
-                EntityRef eref = new SimpleEntityRef(
-                        edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                final EntityRef eref =
+                    new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
 
-                String name = null;
-                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+                String name;
+                if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
                     name = CpNamingUtils.getConnectionType( edge.getType() );
-                } else {
+                }
+                else {
                     name = CpNamingUtils.getCollectionName( edge.getType() );
                 }
-                addMapSet( results, eref, name );
-            }
-
-            if ( limit > 0 && results.keySet().size() >= limit ) {
-                break;
+                addMapSet( entityRefSetMap, eref, name );
             }
-        }
-
-        return results;
+        } ).toBlocking().last();
     }
 
 
@@ -660,9 +650,6 @@ public class CpRelationManager implements RelationManager {
         CollectionScope memberScope = getCollectionScopeNameFromEntityType(
                 applicationScope.getApplication(), itemRef.getType());
 
-        //TODO, this double load should disappear once events are in
-        Id entityId = new SimpleId( itemRef.getUuid(), itemRef.getType() );
-
         if ( memberEntity == null ) {
             throw new RuntimeException(
                     "Unable to load entity uuid=" + itemRef.getUuid() + " type=" + itemRef.getType() );
@@ -694,34 +681,34 @@ public class CpRelationManager implements RelationManager {
         GraphManager gm = managerCache.getGraphManager( applicationScope );
         gm.writeEdge( edge ).toBlockingObservable().last();
 
-        logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}",
-            new Object[] {
-                edgeType,
-                cpHeadEntity.getId().getType(),
-                cpHeadEntity.getId().getUuid(),
-                memberEntity.getId().getType(),
-                memberEntity.getId().getUuid(),
-                applicationScope.getApplication().getType(),
+
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] {
+                edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(),
+                memberEntity.getId().getUuid(), applicationScope.getApplication().getType(),
                 applicationScope.getApplication().getUuid()
-        } );
+            } );
+        }
 
         ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
 
-        logger.debug( "Added entity {}:{} to collection {}", new Object[] {
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Added entity {}:{} to collection {}", new Object[] {
                 itemRef.getUuid().toString(), itemRef.getType(), collName
-        } );
-
+            } );
+        }
         //        logger.debug("With head entity scope is {}:{}:{}", new Object[] {
         //            headEntityScope.getApplication().toString(),
         //            headEntityScope.getOwner().toString(),
         //            headEntityScope.getName()});
 
-        if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
-            getRelationManager( itemEntity ).addToCollection(
-                    collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
-            getRelationManager( itemEntity ).addToCollection(
-                    collection.getLinkedCollection(), headEntity, false );
-        }
+        //TODO T.N.  This should even be neccessary any longer, graph maintains 2 edges.  .
+//        if ( connectBack && collection != null && collection.getLinkedCollection() != null ) {
+//            getRelationManager( itemEntity ).addToCollection(
+//                    collection.getLinkedCollection(), headEntity, cpHeadEntity, false );
+//            getRelationManager( itemEntity ).addToCollection(
+//                    collection.getLinkedCollection(), headEntity, false );
+//        }
 
         return itemEntity;
     }
@@ -951,65 +938,19 @@ public class CpRelationManager implements RelationManager {
 
         logger.debug( "Searching scope {}:{}",
 
-                indexScope.getOwner().toString(), indexScope.getName() );
+            indexScope.getOwner().toString(), indexScope.getName() );
 
         query.setEntityType( collection.getType() );
         query = adjustQuery( query );
 
-        // Because of possible stale entities, which are filtered out by buildResults(),
-        // we loop until the we've got enough results to satisfy the query limit.
 
-        int maxQueries = 10; // max re-queries to satisfy query limit
-
-        final int originalLimit = query.getLimit();
-
-        Results results = null;
-        int queryCount = 0;
-
-        boolean satisfied = false;
+        final CollectionResultsLoaderFactoryImpl resultsLoaderFactory = new CollectionResultsLoaderFactoryImpl( managerCache );
 
 
+        //execute the query and return our next result
+        final QueryExecutor executor = new ElasticSearchQueryExecutor( resultsLoaderFactory, ei, applicationScope, indexScope, types, query );
 
-        while ( !satisfied && queryCount++ < maxQueries ) {
-
-            CandidateResults crs = ei.search( indexScope, types, query );
-
-            if ( results == null ) {
-                logger.debug( "Calling build results 1" );
-                results = buildResults( indexScope, query, crs, collName );
-            }
-            else {
-                logger.debug( "Calling build results 2" );
-                Results newResults = buildResults(indexScope,  query, crs, collName );
-                results.merge( newResults );
-            }
-
-            if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
-                satisfied = true;
-            }
-
-            /**
-             * In an edge case where we delete stale entities, we could potentially get more results than expected.  This will only occur once during the repair phase.
-             * We need to ensure that we short circuit if we overflow the requested limit during a repair.
-             */
-            else if ( results.size() >= originalLimit ) { // got what we need
-                satisfied = true;
-            }
-            else if ( crs.hasCursor() ) {
-                satisfied = false;
-
-                // need to query for more
-                // ask for just what we need to satisfy, don't want to exceed limit
-                query.setCursor( results.getCursor() );
-                query.setLimit( originalLimit - results.size() );
-
-                logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
-                        originalLimit, query.getLimit(), queryCount
-                } );
-            }
-        }
-
-        return results;
+        return executor.next();
     }
 
 
@@ -1350,6 +1291,7 @@ public class CpRelationManager implements RelationManager {
     public Results getConnectedEntities(
             String connectionType, String connectedEntityType, Level level ) throws Exception {
 
+        //until this is refactored properly, we will delegate to a search by query
         Results raw = null;
 
         Preconditions.checkNotNull( connectionType, "connectionType cannot be null" );
@@ -1357,50 +1299,11 @@ public class CpRelationManager implements RelationManager {
         Query query = new Query();
         query.setConnectionType( connectionType );
         query.setEntityType( connectedEntityType );
+        query.setResultsLevel( level );
 
-        if ( connectionType == null ) {
-            raw = searchConnectedEntities( query );
-        }
-        else {
-
-            headEntity = em.validate( headEntity );
-
-
-            IndexScope indexScope = new IndexScopeImpl(
-                    cpHeadEntity.getId(), CpNamingUtils.getConnectionScopeName( connectionType ) );
-
-            final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType );
-
-
-            final EntityIndex ei = managerCache.getEntityIndex( applicationScope );
-
-
-            logger.debug("Searching connected entities from scope {}:{}",
-                indexScope.getOwner().toString(),
-                indexScope.getName());
-
-            query = adjustQuery( query );
-            CandidateResults crs = ei.search( indexScope, searchTypes,  query );
-
-            raw = buildResults( indexScope, query, crs, query.getConnectionType() );
-        }
-
-        if ( Level.ALL_PROPERTIES.equals( level ) ) {
-            List<Entity> entities = new ArrayList<Entity>();
-            for ( EntityRef ref : raw.getEntities() ) {
-                Entity entity = em.get( ref );
-                entities.add( entity );
-            }
-            return Results.fromEntities( entities );
-        }
+        return searchConnectedEntities( query );
 
-        List<ConnectionRef> crefs = new ArrayList<ConnectionRef>();
-        for ( Entity e : raw.getEntities() ) {
-            ConnectionRef cref = new ConnectionRefImpl( headEntity, connectionType, e );
-            crefs.add( cref );
-        }
 
-        return Results.fromConnections( crefs );
     }
 
 
@@ -1472,9 +1375,16 @@ public class CpRelationManager implements RelationManager {
                 } );
 
         query = adjustQuery( query );
-        CandidateResults crs = ei.search( indexScope, searchTypes, query );
 
-        return buildConnectionResults( indexScope, query, crs, connection );
+        final ConnectionResultsLoaderFactoryImpl resultsLoaderFactory = new ConnectionResultsLoaderFactoryImpl( managerCache,
+            headEntity, connection );
+
+        final QueryExecutor executor = new ElasticSearchQueryExecutor(resultsLoaderFactory, ei, applicationScope, indexScope, searchTypes, query);
+
+        return executor.next();
+//        CandidateResults crs = ei.search( indexScope, searchTypes, query );
+
+//        return buildConnectionResults( indexScope, query, crs, connection );
     }
 
 
@@ -1564,63 +1474,33 @@ public class CpRelationManager implements RelationManager {
         return entity;
     }
 
+//
+//    private Results buildConnectionResults( final IndexScope indexScope,
+//            final Query query, final CandidateResults crs, final String connectionType ) {
+//
+//        if ( query.getLevel().equals( Level.ALL_PROPERTIES ) ) {
+//            return buildResults( indexScope, query, crs, connectionType );
+//        }
+//
+//        final EntityRef sourceRef = new SimpleEntityRef( headEntity.getType(), headEntity.getUuid() );
+//
+//        List<ConnectionRef> refs = new ArrayList<ConnectionRef>( crs.size() );
+//
+//        for ( CandidateResult cr : crs ) {
+//
+//            SimpleEntityRef targetRef =
+//                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid() );
+//
+//            final ConnectionRef ref =
+//                    new ConnectionRefImpl( sourceRef, connectionType, targetRef );
+//
+//            refs.add( ref );
+//        }
+//
+//        return Results.fromConnections( refs );
+//    }
 
-    private Results buildConnectionResults( final IndexScope indexScope,
-            final Query query, final CandidateResults crs, final String connectionType ) {
-
-        if ( query.getLevel().equals( Level.ALL_PROPERTIES ) ) {
-            return buildResults( indexScope, query, crs, connectionType );
-        }
-
-        final EntityRef sourceRef = new SimpleEntityRef( headEntity.getType(), headEntity.getUuid() );
-
-        List<ConnectionRef> refs = new ArrayList<ConnectionRef>( crs.size() );
-
-        for ( CandidateResult cr : crs ) {
-
-            SimpleEntityRef targetRef =
-                    new SimpleEntityRef( cr.getId().getType(), cr.getId().getUuid() );
-
-            final ConnectionRef ref =
-                    new ConnectionRefImpl( sourceRef, connectionType, targetRef );
-
-            refs.add( ref );
-        }
-
-        return Results.fromConnections( refs );
-    }
-
-
-    /**
-     * Build results from a set of candidates, and discard those that represent stale indexes.
-     *
-     * @param query Query that was executed
-     * @param crs Candidates to be considered for results
-     * @param collName Name of collection or null if querying all types
-     */
-    private Results buildResults( final IndexScope indexScope, final Query query,
-            final CandidateResults crs, final String collName ) {
-
-        logger.debug( "buildResults() for {} from {} candidates", collName, crs.size() );
-
-        //get an instance of our results loader
-        final ResultsLoader resultsLoader = this.resultsLoaderFactory.getLoader(
-                applicationScope, indexScope, query.getResultsLevel() );
-
-        //load the results
-        final Results results = resultsLoader.loadResults( crs );
-
-        //signal for post processing
-        resultsLoader.postProcess();
-
-
-        results.setCursor( crs.getCursor() );
-        results.setQueryProcessor( new CpQueryProcessor( em, query, headEntity, collName ) );
-
-        logger.debug( "Returning results size {}", results.size() );
 
-        return results;
-    }
 
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java
new file mode 100644
index 0000000..b74f433
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionRefsVerifier.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.SimpleEntityRef;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+public class CollectionRefsVerifier extends VersionVerifier {
+
+
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
+        for ( Id id : ids ) {
+            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
+        }
+        return Results.fromRefList( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..b79700b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/CollectionResultsLoaderFactoryImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.query.Query;
+
+
+/**
+ * Factory for creating results
+ */
+public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
+
+    private final ManagerCache managerCache;
+
+
+    public CollectionResultsLoaderFactoryImpl( final ManagerCache managerCache ) {
+        this.managerCache = managerCache;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope scope, final Query.Level resultsLevel ) {
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new CollectionRefsVerifier();
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+//            verifier = new RefsVerifier();
+            verifier = new IdsVerifier();
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+        return new FilteringLoader( managerCache, verifier, applicationScope, scope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java
new file mode 100644
index 0000000..408edd3
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionRefsVerifier.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.usergrid.persistence.ConnectedEntityRef;
+import org.apache.usergrid.persistence.ConnectionRef;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.cassandra.ConnectedEntityRefImpl;
+import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
+
+
+/**
+ * Verifier for creating connections
+ */
+public class ConnectionRefsVerifier extends VersionVerifier {
+
+
+    private final EntityRef ownerId;
+    private final String connectionType;
+
+
+    public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
+        this.ownerId = ownerId;
+        this.connectionType = connectionType;
+    }
+
+    @Override
+    public Results getResults( final Collection<Id> ids ) {
+        List<ConnectionRef> refs = new ArrayList<>();
+        for ( Id id : ids ) {
+            refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid())  ));
+        }
+
+        return Results.fromConnections( refs );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java
new file mode 100644
index 0000000..ba8eb2c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ConnectionResultsLoaderFactoryImpl.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import org.apache.usergrid.corepersistence.ManagerCache;
+import org.apache.usergrid.persistence.EntityRef;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.query.Query;
+
+
+/**
+ * Factory for creating results
+ */
+public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
+
+    private final ManagerCache managerCache;
+    private final EntityRef ownerId;
+    private final String connectionType;
+
+
+    public ConnectionResultsLoaderFactoryImpl( final ManagerCache managerCache, final EntityRef ownerId,
+                                               final String connectionType ) {
+        this.managerCache = managerCache;
+        this.ownerId = ownerId;
+        this.connectionType = connectionType;
+    }
+
+
+    @Override
+    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope scope, final Query.Level resultsLevel ) {
+
+        ResultsVerifier verifier;
+
+        if ( resultsLevel == Query.Level.REFS ) {
+            verifier = new ConnectionRefsVerifier( ownerId, connectionType );
+        }
+        else if ( resultsLevel == Query.Level.IDS ) {
+            verifier = new ConnectionRefsVerifier( ownerId, connectionType );;
+        }
+        else {
+            verifier = new EntityVerifier(Query.MAX_LIMIT);
+        }
+
+        return new FilteringLoader( managerCache, verifier, applicationScope, scope );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
new file mode 100644
index 0000000..535af36
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
+
+
+public class ElasticSearchQueryExecutor implements QueryExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
+
+    private final ResultsLoaderFactory resultsLoaderFactory;
+
+    private final ApplicationScope applicationScope;
+
+    private final EntityIndex entityIndex;
+
+    private final IndexScope indexScope;
+
+    private final SearchTypes types;
+
+    private final Query query;
+
+
+    private Results currentResults;
+
+    private boolean moreToLoad = true;
+
+
+    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final EntityIndex entityIndex,
+                                       final ApplicationScope applicationScope, final IndexScope indexScope,
+                                       final SearchTypes types, final Query query ) {
+        this.resultsLoaderFactory = resultsLoaderFactory;
+        this.applicationScope = applicationScope;
+        this.entityIndex = entityIndex;
+        this.indexScope = indexScope;
+        this.types = types;
+
+        //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
+        //once we start subscribing to streams.
+        this.query = new Query(query);
+    }
+
+
+    @Override
+    public Iterator<Results> iterator() {
+        return this;
+    }
+
+
+    private void loadNextPage() {
+        // Because of possible stale entities, which are filtered out by buildResults(),
+        // we loop until the we've got enough results to satisfy the query limit.
+
+        final int maxQueries = 10; // max re-queries to satisfy query limit
+
+        final int originalLimit = query.getLimit();
+
+        Results results = null;
+        int queryCount = 0;
+
+        boolean satisfied = false;
+
+
+        while ( !satisfied && queryCount++ < maxQueries ) {
+
+            CandidateResults crs = entityIndex.search( indexScope, types, query );
+
+            logger.debug( "Calling build results 1" );
+            results = buildResults( indexScope, query, crs );
+
+            if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
+                satisfied = true;
+            }
+
+            /**
+             * In an edge case where we delete stale entities, we could potentially get less results than expected.
+             * This will only occur once during the repair phase.
+             * We need to ensure that we short circuit before we overflow the requested limit during a repair.
+             */
+            else if ( results.size() > 0 ) { // got what we need
+                satisfied = true;
+            }
+            //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
+            //circuit to avoid over returning the result set
+            else if ( crs.hasCursor() ) {
+                satisfied = false;
+
+                // need to query for more
+                // ask for just what we need to satisfy, don't want to exceed limit
+                query.setCursor( results.getCursor() );
+                query.setLimit( originalLimit - results.size() );
+
+                logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+                    originalLimit, query.getLimit(), queryCount
+                } );
+            }
+        }
+
+        //now set our cursor if we have one for the next iteration
+        if ( results.hasCursor() ) {
+            query.setCursor( results.getCursor() );
+            moreToLoad = true;
+        }
+
+        else {
+            moreToLoad = false;
+        }
+
+
+        //set our current results and the flag
+        this.currentResults = results;
+    }
+
+
+    /**
+     * Build results from a set of candidates, and discard those that represent stale indexes.
+     *
+     * @param query Query that was executed
+     * @param crs Candidates to be considered for results
+     */
+    private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs ) {
+
+        logger.debug( "buildResults()  from {} candidates", crs.size() );
+
+        //get an instance of our results loader
+        final ResultsLoader resultsLoader =
+            this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() );
+
+        //load the results
+        final Results results = resultsLoader.loadResults( crs );
+
+        //signal for post processing
+        resultsLoader.postProcess();
+
+
+        results.setCursor( crs.getCursor() );
+
+        //ugly and tight coupling, but we don't have a choice until we finish some refactoring
+        results.setQueryExecutor( this );
+
+        logger.debug( "Returning results size {}", results.size() );
+
+        return results;
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        //we've tried to load and it's empty and we have more to load, load the next page
+        if ( currentResults == null ) {
+            //there's nothing left to load, nothing to do
+            if ( !moreToLoad ) {
+                return false;
+            }
+
+            //load the page
+
+            loadNextPage();
+        }
+
+
+        //see if our current results are not null
+        return currentResults != null;
+    }
+
+
+    @Override
+    public Results next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No more results present" );
+        }
+
+        final Results toReturn = currentResults;
+
+        currentResults = null;
+
+        return toReturn;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
new file mode 100644
index 0000000..3afb77f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/QueryExecutor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.results;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.Results;
+
+
+/**
+ * A strategy interface for executing queries.  Each item in the iterator is a single collection of results
+ * Each implementation should always return 1 element of Results, even if the results are empty.
+ *
+ * QueryExecutor.next() should always return a non-null Results object
+ */
+public interface QueryExecutor extends Iterable<Results>, Iterator<Results> {
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
deleted file mode 100644
index 096c271..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/RefsVerifier.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.results;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class RefsVerifier extends VersionVerifier {
-
-    @Override
-    public Results getResults( final Collection<Id> ids ) {
-        List<EntityRef> refs = new ArrayList<EntityRef>();
-        for ( Id id : ids ) {
-            refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
-        }
-        return Results.fromRefList( refs );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
index 14fbf88..3fbfff9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactory.java
@@ -37,5 +37,6 @@ public interface ResultsLoaderFactory {
      * @param indexScope The index scope used in the search
      * @param
      */
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope indexScope, final Query.Level resultsLevel );
+    ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope indexScope,
+                             final Query.Level resultsLevel );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
deleted file mode 100644
index 892b736..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.results;
-
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.query.Query;
-
-import com.google.inject.Inject;
-
-
-/**
- * Factory for creating results
- */
-public class ResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
-    private final ManagerCache managerCache;
-
-
-    @Inject
-    public ResultsLoaderFactoryImpl( final ManagerCache managerCache ) {
-        this.managerCache = managerCache;
-    }
-
-
-    @Override
-    public ResultsLoader getLoader( final ApplicationScope applicationScope, final IndexScope scope, final Query.Level resultsLevel ) {
-
-        ResultsVerifier verifier;
-
-        if ( resultsLevel == Query.Level.REFS ) {
-            verifier = new RefsVerifier();
-        }
-        else if ( resultsLevel == Query.Level.IDS ) {
-            verifier = new RefsVerifier();
-        }
-        else {
-            verifier = new EntityVerifier(Query.MAX_LIMIT);
-        }
-
-        return new FilteringLoader( managerCache, verifier, applicationScope, scope );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
index 5b64d0b..52e235c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
@@ -24,7 +24,7 @@ import org.apache.usergrid.persistence.index.query.Query.Level;
 
 
 /**
- * For each in a set of source refs executes a sub-query and provides a unified iterator over 
+ * For each in a set of source refs executes a sub-query and provides a unified iterator over
  * the union of all results. Honors page sizes for the Query to ensure memory isn't blown out.
  */
 public class MultiQueryIterator implements ResultsIterator {
@@ -36,10 +36,6 @@ public class MultiQueryIterator implements ResultsIterator {
     private Iterator currentIterator;
 
 
-    public MultiQueryIterator( Results results, Query query ) {
-        this( results.getQueryProcessor().getEntityManager(), 
-                new PagingResultsIterator( results, Level.IDS ), query );
-    }
 
 
     public MultiQueryIterator( EntityManager entityManager, Iterator<EntityRef> source, Query query ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 388b29e..f179000 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 
 import javax.xml.bind.annotation.XmlRootElement;
 
+import org.apache.usergrid.corepersistence.results.QueryExecutor;
 import org.apache.usergrid.persistence.cassandra.QueryProcessor;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -81,9 +82,7 @@ public class Results implements Iterable<Entity> {
     Query query;
     Object data;
     String dataName;
-
-    private QueryProcessor queryProcessor;
-    private SearchVisitor searchVisitor;
+    private QueryExecutor queryExecutor;
 
 
     public Results() {
@@ -1268,31 +1267,18 @@ public class Results implements Iterable<Entity> {
     }
 
 
-    protected QueryProcessor getQueryProcessor() {
-        return queryProcessor;
-    }
-
-
-    public void setQueryProcessor( QueryProcessor queryProcessor ) {
-        this.queryProcessor = queryProcessor;
-    }
-
-
-    public void setSearchVisitor( SearchVisitor searchVisitor ) {
-        this.searchVisitor = searchVisitor;
+    public void setQueryExecutor(final QueryExecutor queryExecutor){
+        this.queryExecutor = queryExecutor;
     }
 
 
     /** uses cursor to get next batch of Results (returns null if no cursor) */
     public Results getNextPageResults() throws Exception {
-        if ( !hasCursor() ) {
+        if ( queryExecutor == null || !queryExecutor.hasNext() ) {
             return null;
         }
 
-        Query q = new Query( query );
-        q.setCursor( getCursor() );
-        queryProcessor.setQuery( q );
 
-        return queryProcessor.getResults( searchVisitor );
+        return queryExecutor.next();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
index 874ff88..94569d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/cassandra/QueryProcessorImpl.java
@@ -292,7 +292,7 @@ public class QueryProcessorImpl implements QueryProcessor {
             }
         }
         if (logger.isDebugEnabled()) {
-        	logger.debug("Getting result for query: [{}],  returning entityIds size: {}", 
+        	logger.debug("Getting result for query: [{}],  returning entityIds size: {}",
                     getQuery(), entityIds.size());
         }
 
@@ -307,8 +307,8 @@ public class QueryProcessorImpl implements QueryProcessor {
         results.setCursor( resultsCursor.asString() );
 
         results.setQuery( query );
-        results.setQueryProcessor( this );
-        results.setSearchVisitor( visitor );
+//        results.setQueryProcessor( this );
+//        results.setSearchVisitor( visitor );
 
         return results;
     }
@@ -621,12 +621,12 @@ public class QueryProcessorImpl implements QueryProcessor {
 
         @Override
         public QueryBuilder getQueryBuilder() {
-            throw new UnsupportedOperationException("Not supported by this vistor implementation."); 
+            throw new UnsupportedOperationException("Not supported by this vistor implementation.");
         }
 
         @Override
         public FilterBuilder getFilterBuilder() {
-            throw new UnsupportedOperationException("Not supported by this vistor implementation."); 
+            throw new UnsupportedOperationException("Not supported by this vistor implementation.");
         }
     }
 
@@ -724,4 +724,4 @@ public class QueryProcessorImpl implements QueryProcessor {
     public EntityManager getEntityManager() {
         return em;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
index c56c07f..6ab672a 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
@@ -27,8 +27,6 @@ import java.util.UUID;
 
 import org.junit.Test;
 
-import org.apache.commons.lang3.RandomStringUtils;
-
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.persistence.index.query.Query;
 import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -181,15 +179,45 @@ public class PathQueryIT extends AbstractCoreIT {
         deviceQuery.addFilter( "index >= 4" );
         int expectedDeviceQuerySize = 3;
 
-        PathQuery groupsPQ = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
-        PathQuery usersPQ = groupsPQ.chain( userQuery );
-        PathQuery<Entity> devicesPQ = usersPQ.chain( deviceQuery );
 
-        HashSet set = new HashSet( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize );
-        Iterator<Entity> i = devicesPQ.iterator( em );
+        final PathQuery groupsPQ = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
+
+
+        //test 1 level deep
+        HashSet groupSet = new HashSet( expectedGroupQuerySize );
+        Iterator<Entity> groupsIterator = groupsPQ.iterator( em );
+
+        while ( groupsIterator.hasNext() ) {
+            groupSet.add( groupsIterator.next() );
+        }
+
+        assertEquals( expectedGroupQuerySize, groupSet.size() );
+
+        //test 2 levels
+
+        final PathQuery groupsPQ1 = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
+        PathQuery usersPQ1 = groupsPQ1.chain( userQuery );
+        final Iterator<Entity> userIterator  = usersPQ1.iterator( em );
+
+        final HashSet userSet = new HashSet( expectedGroupQuerySize * expectedUserQuerySize );
+
+        while ( userIterator.hasNext() ) {
+            userSet.add( userIterator.next() );
+        }
+
+        assertEquals( expectedGroupQuerySize * expectedUserQuerySize, userSet.size() );
+
+
+// ORIGINAL TEST, restore
+        PathQuery groupsPQ2 = new PathQuery(new SimpleEntityRef( em.getApplicationRef() ), groupQuery );
+        PathQuery usersPQ2 = groupsPQ2.chain( userQuery );
+        PathQuery<Entity> devicesPQ2 = usersPQ2.chain( deviceQuery );
+
+        final HashSet deviceSet = new HashSet( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize );
+        Iterator<Entity> i = devicesPQ2.iterator( em );
         while ( i.hasNext() ) {
-            set.add( i.next() );
+            deviceSet.add( i.next() );
         }
-        assertEquals( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize, set.size() );
+        assertEquals( expectedGroupQuerySize * expectedUserQuerySize * expectedDeviceQuerySize, deviceSet.size() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/core/src/test/resources/log4j.properties b/stack/core/src/test/resources/log4j.properties
index 2b156b3..0ba16ea 100644
--- a/stack/core/src/test/resources/log4j.properties
+++ b/stack/core/src/test/resources/log4j.properties
@@ -44,6 +44,8 @@ log4j.logger.org.apache.usergrid.persistence.PerformanceEntityRebuildIndexTest=D
 #log4j.logger.org.apache.usergrid.persistence=INFO
 
 log4j.logger.org.apache.usergrid.corepersistence.migration=WARN
+
+log4j.logger.org.apache.usergrid.persistence.index.impl=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpSetup=INFO
 #log4j.logger.org.apache.usergrid.corepersistence=DEBUG
 #log4j.logger.org.apache.usergrid.corepersistence.CpEntityManagerFactory=DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 69e0422..e129209 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -963,16 +963,19 @@ public abstract class GraphManagerIT {
         Id targetId2 = new SimpleId( "target2" );
 
 
-        Edge edge1 = createEdge( sourceId, "test", targetId1, System.currentTimeMillis() );
+        final long edge1Time = System.currentTimeMillis();
+        final long edge2Time = edge1Time+1;
+
+        Edge edge1 = createEdge( sourceId, "test", targetId1, edge1Time );
 
         gm.writeEdge( edge1 ).toBlocking().singleOrDefault( null );
 
-        Edge edge2 = createEdge( sourceId, "test", targetId2, System.currentTimeMillis() );
+        Edge edge2 = createEdge( sourceId, "test", targetId2, edge2Time );
 
         gm.writeEdge( edge2 ).toBlocking().singleOrDefault( null );
 
 
-        final long maxVersion = System.currentTimeMillis();
+        final long maxVersion = edge2Time+1;
 
 
         assertTrue( Long.compare( maxVersion, edge2.getTimestamp() ) > 0 );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 6be8234..c7be79d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -24,8 +24,6 @@ import org.safehaus.guicyfig.FigSingleton;
 import org.safehaus.guicyfig.GuicyFig;
 import org.safehaus.guicyfig.Key;
 
-import org.apache.usergrid.persistence.index.guice.QueueProvider;
-
 
 @FigSingleton
 public interface IndexFig extends GuicyFig {
@@ -93,6 +91,13 @@ public interface IndexFig extends GuicyFig {
      */
     public static final String ELASTICSEARCH_QUEUE_IMPL = "elasticsearch.queue_impl";
 
+
+    /**
+     * The queue implementation to use.  Values come from <class>QueueProvider.Implementations</class>
+     */
+    public static final String ELASTICSEARCH_QUEUE_OFFER_TIMEOUT = "elasticsearch.queue.offer_timeout";
+
+
     public static final String QUERY_LIMIT_DEFAULT = "index.query.limit.default";
 
     @Default( "127.0.0.1" )
@@ -115,7 +120,7 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_ALIAS_POSTFIX )
     String getAliasPostfix();
 
-    @Default( "1" ) // TODO: does this timeout get extended on each query?
+    @Default( "5" ) // TODO: does this timeout get extended on each query?
     @Key( QUERY_CURSOR_TIMEOUT_MINUTES )
     int getQueryCursorTimeout();
 
@@ -193,7 +198,7 @@ public interface IndexFig extends GuicyFig {
     @Key( INDEX_QUEUE_READ_TIMEOUT )
     int getIndexQueueTimeout();
 
-    @Default("2")
+    @Default( "2" )
     @Key( ELASTICSEARCH_WORKER_COUNT )
     int getWorkerCount();
 
@@ -201,4 +206,7 @@ public interface IndexFig extends GuicyFig {
     @Key( ELASTICSEARCH_QUEUE_IMPL )
     String getQueueImplementation();
 
+    @Default( "1000" )
+    @Key( ELASTICSEARCH_QUEUE_OFFER_TIMEOUT )
+    long getQueueOfferTimeout();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
index 998c086..b6eaf89 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/BufferQueueInMemoryImpl.java
@@ -36,18 +36,26 @@ import com.google.inject.Singleton;
 @Singleton
 public class BufferQueueInMemoryImpl implements BufferQueue {
 
+
+    private final IndexFig fig;
     private final ArrayBlockingQueue<IndexOperationMessage> messages;
 
 
     @Inject
     public BufferQueueInMemoryImpl( final IndexFig fig ) {
+        this.fig = fig;
         messages = new ArrayBlockingQueue<>( fig.getIndexQueueSize() );
     }
 
 
     @Override
     public void offer( final IndexOperationMessage operation ) {
-        messages.offer( operation );
+        try {
+            messages.offer( operation, fig.getQueueOfferTimeout(), TimeUnit.MILLISECONDS );
+        }
+        catch ( InterruptedException e ) {
+            throw new RuntimeException("Unable to offer message to queue", e);
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index b31cf39..7e64de3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -19,18 +19,12 @@
  */
 package org.apache.usergrid.persistence.index.impl;
 
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.IndexBufferConsumer;
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexOperationMessage;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.elasticsearch.action.WriteConsistencyLevel;
 import org.elasticsearch.action.bulk.BulkItemResponse;
@@ -39,6 +33,19 @@ import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.client.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.IndexBufferConsumer;
+import org.apache.usergrid.persistence.index.IndexFig;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
@@ -47,11 +54,6 @@ import rx.functions.Func1;
 import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 /**
  * Consumer for IndexOperationMessages
@@ -189,7 +191,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         }
                         while ( true );
                     }
-                } ).subscribeOn( Schedulers.newThread() ).doOnNext( new Action1<List<IndexOperationMessage>>() {
+                } ).doOnNext( new Action1<List<IndexOperationMessage>>() {
                 @Override
                 public void call( List<IndexOperationMessage> containerList ) {
                     if ( containerList.size() == 0 ) {
@@ -203,7 +205,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     execute( containerList );
 
                     time.stop();
-
                 }
             } )
                 //ack after we process
@@ -214,15 +215,29 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         //release  so we know we've done processing
                         inFlight.addAndGet( -1 * indexOperationMessages.size() );
                     }
-                } ).doOnError( new Action1<Throwable>() {
+                } )
+                //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die
+                .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() {
                     @Override
-                    public void call( final Throwable throwable ) {
+                    public List<IndexOperationMessage> call( final Throwable throwable ) {
+                        final long sleepTime = config.getFailureRetryTime();
+
+                        log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, throwable );
+
+                        try {
+                            Thread.sleep( sleepTime );
+                        }
+                        catch ( InterruptedException ie ) {
+                            //swallow
+                        }
 
-                        log.error( "An exception occurred when trying to deque and write to elasticsearch.  Ignoring",
-                            throwable );
                         indexErrorCounter.inc();
+
+                        return Collections.EMPTY_LIST;
                     }
-                } );
+                } )
+
+                .subscribeOn( Schedulers.newThread() );
 
             //start in the background
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index da68772..9a7a867 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@ -113,33 +113,41 @@ public class Query {
     }
 
 
+    /**
+     * Creates a deep copy of a query from another query
+     * @param q
+     */
     public Query( Query q ) {
-        if ( q != null ) {
-            type = q.type;
-            sortPredicates = q.sortPredicates != null
-                    ? new ArrayList<SortPredicate>( q.sortPredicates ) : null;
-            startResult = q.startResult;
-            cursor = q.cursor;
-            limit = q.limit;
-            selectAssignments = q.selectAssignments != null
-                    ? new LinkedHashMap<String, String>( q.selectAssignments ) : null;
-            mergeSelectResults = q.mergeSelectResults;
-            //level = q.level;
-            connectionType = q.connectionType;
-            permissions = q.permissions != null ? new ArrayList<String>( q.permissions ) : null;
-            reversed = q.reversed;
-            reversedSet = q.reversedSet;
-            startTime = q.startTime;
-            finishTime = q.finishTime;
-            resolution = q.resolution;
-            pad = q.pad;
-            rootOperand = q.rootOperand;
-            identifiers = q.identifiers != null
-                    ? new ArrayList<Identifier>( q.identifiers ) : null;
-            counterFilters = q.counterFilters != null
-                    ? new ArrayList<CounterFilterPredicate>( q.counterFilters ) : null;
-            collection = q.collection;
+        if ( q == null ) {
+            return;
         }
+
+        type = q.type;
+        sortPredicates = q.sortPredicates != null
+                ? new ArrayList<>( q.sortPredicates ) : null;
+        startResult = q.startResult;
+        cursor = q.cursor;
+        limit = q.limit;
+        selectAssignments = q.selectAssignments != null
+                ? new LinkedHashMap<>( q.selectAssignments ) : null;
+        mergeSelectResults = q.mergeSelectResults;
+        //level = q.level;
+        connectionType = q.connectionType;
+        permissions = q.permissions != null ? new ArrayList<>( q.permissions ) : null;
+        reversed = q.reversed;
+        reversedSet = q.reversedSet;
+        startTime = q.startTime;
+        finishTime = q.finishTime;
+        resolution = q.resolution;
+        pad = q.pad;
+        rootOperand = q.rootOperand;
+        identifiers = q.identifiers != null
+                ? new ArrayList<>( q.identifiers ) : null;
+        counterFilters = q.counterFilters != null
+                ? new ArrayList<>( q.counterFilters ) : null;
+        collection = q.collection;
+        level = q.level;
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
index 6c7b480..9c768bf 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.lang.math.NumberUtils;
-import org.apache.usergrid.persistence.collection.util.EntityUtils;
-import org.apache.usergrid.persistence.model.entity.Id;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c7fa864f/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index ee3f9cd..aea9d49 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -121,7 +121,7 @@
       <!-- only use half the cores on the machine for testing -->
       <usergrid.it.parallel>methods</usergrid.it.parallel>
       <usergrid.it.reuseForks>true</usergrid.it.reuseForks>
-      <usergrid.it.forkCount>4</usergrid.it.forkCount>
+      <usergrid.it.forkCount>1</usergrid.it.forkCount>
       <usergrid.it.threads>8</usergrid.it.threads>
 
       <metrics.version>3.0.0</metrics.version>


[10/10] incubator-usergrid git commit: fix cursor

Posted by sf...@apache.org.
fix cursor


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2df40137
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2df40137
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2df40137

Branch: refs/heads/USERGRID-501
Commit: 2df40137c9d96ec05f82605e77ebcaeba016a420
Parents: 96d27b9
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Mar 26 14:23:04 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Mar 26 14:23:04 2015 -0600

----------------------------------------------------------------------
 .../impl/EsApplicationEntityIndexImpl.java      | 11 +++++++---
 .../migration/EsIndexDataMigrationImpl.java     |  2 +-
 .../index/guice/TestIndexModule.java            | 22 ++++++++++++++++++++
 3 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2df40137/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 603bee4..53286b4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -142,6 +142,10 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
     @Override
     public CandidateResults search(final IndexScope indexScope, final SearchTypes searchTypes, final Query query, final int limit){
 
+        if(query.getCursor()!=null){
+            return getNextPage(query.getCursor());
+        }
+
         SearchResponse searchResponse;
 
         SearchRequestBuilder srb = searchRequest.getBuilder(indexScope, searchTypes, query,limit);
@@ -291,11 +295,12 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex{
             candidates.add(new CandidateResult(entityId, UUID.fromString(version)));
         }
 
-        CandidateResults candidateResults = new CandidateResults(candidates);
-        if(candidateResults.hasCursor()) {
+        final CandidateResults candidateResults = new CandidateResults(candidates);
+        final String esScrollCursor = searchResponse.getScrollId();
+
+        if(esScrollCursor != null) {
             candidateResults.initializeCursor();
 
-            final String esScrollCursor = searchResponse.getScrollId();
             //now set this into our map module
             final int minutes = indexFig.getQueryCursorTimeout();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2df40137/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
index 2a0bb15..64b7d29 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/migration/EsIndexDataMigrationImpl.java
@@ -80,7 +80,7 @@ public class EsIndexDataMigrationImpl implements DataMigration<ApplicationScope>
         .doOnError(error -> log.error("failed to migrate index",error))
         .toBlocking().last();
 
-        return integer.get();
+        return dataVersion.getImplementationVersion();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2df40137/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
index 4e6700c..0c57b36 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/TestIndexModule.java
@@ -19,10 +19,18 @@
 package org.apache.usergrid.persistence.index.guice;
 
 
+import com.google.inject.TypeLiteral;
+import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.core.guice.TestModule;
+import rx.Observable;
+
+import java.util.UUID;
 
 
 public class TestIndexModule extends TestModule {
@@ -37,8 +45,22 @@ public class TestIndexModule extends TestModule {
             @Override
             public  void configureMigrationProvider(){
 
+                bind( new TypeLiteral<MigrationDataProvider<ApplicationScope>>() {} ).to(
+                    TestAllApplicationsObservable.class );
             }
         });
         install( new GuicyFigModule(IndexTestFig.class) );
     }
+    public static class TestAllApplicationsObservable implements MigrationDataProvider<ApplicationScope>{
+
+
+        @Override
+        public Observable<ApplicationScope> getData() {
+            ApplicationScope[] scopes = new ApplicationScope[]{
+                new ApplicationScopeImpl(new SimpleId(UUID.randomUUID(),"application"))
+            };
+            return Observable.from(scopes);
+        }
+    }
+
 }