You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/26 21:23:29 UTC

[04/10] incubator-usergrid git commit: Merge branch 'two-dot-o' into two-dot-o-dev

Merge branch 'two-dot-o' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6db7ce93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6db7ce93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6db7ce93

Branch: refs/heads/USERGRID-501
Commit: 6db7ce937b008b26f3f06e9a6fbe295b4b8d7dc3
Parents: 66fdc61 beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:31:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:31:31 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      | 304 ++++++-------------
 .../results/CollectionRefsVerifier.java         |  44 +++
 .../CollectionResultsLoaderFactoryImpl.java     |  60 ++++
 .../results/ConnectionRefsVerifier.java         |  61 ++++
 .../ConnectionResultsLoaderFactoryImpl.java     |  65 ++++
 .../results/ElasticSearchQueryExecutor.java     | 212 +++++++++++++
 .../corepersistence/results/QueryExecutor.java  |  37 +++
 .../corepersistence/results/RefsVerifier.java   |  42 ---
 .../results/ResultsLoaderFactory.java           |   3 +-
 .../results/ResultsLoaderFactoryImpl.java       |  62 ----
 .../persistence/MultiQueryIterator.java         |   6 +-
 .../apache/usergrid/persistence/Results.java    |  26 +-
 .../cassandra/QueryProcessorImpl.java           |  12 +-
 .../usergrid/persistence/PathQueryIT.java       |  46 ++-
 stack/core/src/test/resources/log4j.properties  |   2 +
 .../persistence/graph/GraphManagerIT.java       |   2 +-
 .../usergrid/persistence/index/IndexFig.java    |  16 +-
 .../index/impl/BufferQueueInMemoryImpl.java     |  10 +-
 .../index/impl/EsIndexBufferConsumerImpl.java   |  61 ++--
 .../usergrid/persistence/index/query/Query.java |  58 ++--
 .../persistence/index/utils/ListUtils.java      |   3 +-
 .../cassandra/ManagementServiceImpl.java        |  18 +-
 .../providers/PingIdentityProvider.java         |   5 +-
 23 files changed, 731 insertions(+), 424 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index f505fa3,da39ea9..5416f19
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@@ -76,9 -76,11 +78,9 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
  import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 -import org.apache.usergrid.persistence.index.EntityIndex;
++import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+ import org.apache.usergrid.persistence.index.EntityIndexBatch;
 -import org.apache.usergrid.persistence.index.IndexScope;
 -import org.apache.usergrid.persistence.index.SearchTypes;
  import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
- import org.apache.usergrid.persistence.index.query.CandidateResult;
- import org.apache.usergrid.persistence.index.query.CandidateResults;
  import org.apache.usergrid.persistence.index.query.Identifier;
  import org.apache.usergrid.persistence.index.query.Query;
  import org.apache.usergrid.persistence.index.query.Query.Level;
@@@ -275,9 -275,9 +275,9 @@@ public class CpRelationManager implemen
          });
  
          Observable<String> types= gm.getEdgeTypesFromSource(
-             new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix,  null ));
+             new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
  
 -        Iterator<String> iter = types.toBlockingObservable().getIterator();
 +        Iterator<String> iter = types.toBlocking().getIterator();
          while ( iter.hasNext() ) {
              indexes.add( iter.next() );
          }
@@@ -317,59 -318,51 +318,47 @@@
       * @param edgeType Edge type, edge type prefix or null to allow any edge type
       * @param fromEntityType Only consider edges from entities of this type
       */
-     Map<EntityRef, Set<String>> getContainers( int limit, String edgeType, String fromEntityType ) {
- 
-         Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
- 
-         GraphManager gm = managerCache.getGraphManager(applicationScope);
- 
-         Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
-             cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
- 
-         logger.debug("getContainers(): "
-                 + "Searched for edges of type {}\n   to target {}:{}\n   in scope {}\n   found: {}",
-             new Object[] {
-                 edgeType,
-                 cpHeadEntity.getId().getType(),
-                 cpHeadEntity.getId().getUuid(),
-                 applicationScope.getApplication(),
-                 edgeTypes.hasNext()
-         });
+     Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
  
-         while ( edgeTypes.hasNext() ) {
 -        Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
--
-             String etype = edgeTypes.next();
+         final GraphManager gm = managerCache.getGraphManager( applicationScope );
  
-             Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
-                 cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+         Observable<Edge> edges =
+             gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
+               .flatMap( new Func1<String, Observable<Edge>>() {
+                   @Override
+                   public Observable<Edge> call( final String edgeType ) {
+                       return gm.loadEdgesToTarget(
+                           new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
+                               SearchByEdgeType.Order.DESCENDING, null ) );
+ 
+                   }
+               } );
+ 
+         //if our limit is set, take them.  Note this logic is still borked, we can't possibly fit everything in memmory
+         if ( limit > -1 ) {
+             edges = edges.take( limit );
+         }
  
-             Iterator<Edge> iter = edges.toBlocking().getIterator();
-             while ( iter.hasNext() ) {
-                 Edge edge = iter.next();
  
-                 if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
-                     logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
-                     continue;
 -        return edges.collect( results, new Action2<Map<EntityRef, Set<String>>, Edge>() {
 -            @Override
 -            public void call( final Map<EntityRef, Set<String>> entityRefSetMap, final Edge edge ) {
++        return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge) -> {
+                 if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+                     logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
+                     return;
                  }
  
-                 EntityRef eref = new SimpleEntityRef(
-                         edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+                 final EntityRef eref =
+                     new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
  
-                 String name = null;
-                 if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+                 String name;
+                 if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
                      name = CpNamingUtils.getConnectionType( edge.getType() );
-                 } else {
+                 }
+                 else {
                      name = CpNamingUtils.getCollectionName( edge.getType() );
                  }
-                 addMapSet( results, eref, name );
+                 addMapSet( entityRefSetMap, eref, name );
              }
- 
-             if ( limit > 0 && results.keySet().size() >= limit ) {
-                 break;
-             }
-         }
- 
-         return results;
 -        } ).toBlocking().last();
++         ).toBlocking().last();
      }
  
  
@@@ -689,18 -679,16 +675,16 @@@
          // create graph edge connection from head entity to member entity
          Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
          GraphManager gm = managerCache.getGraphManager( applicationScope );
 -        gm.writeEdge( edge ).toBlockingObservable().last();
 +        gm.writeEdge( edge ).toBlocking().last();
  
-         logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}",
-             new Object[] {
-                 edgeType,
-                 cpHeadEntity.getId().getType(),
-                 cpHeadEntity.getId().getUuid(),
-                 memberEntity.getId().getType(),
-                 memberEntity.getId().getUuid(),
-                 applicationScope.getApplication().getType(),
+ 
+         if(logger.isDebugEnabled()) {
+             logger.debug( "Wrote edgeType {}\n   from {}:{}\n   to {}:{}\n   scope {}:{}", new Object[] {
+                 edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(),
+                 memberEntity.getId().getUuid(), applicationScope.getApplication().getType(),
                  applicationScope.getApplication().getUuid()
-         } );
+             } );
+         }
  
          ( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
index 0000000,535af36..fea21ae
mode 000000,100644..100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
@@@ -1,0 -1,211 +1,212 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ 
+ package org.apache.usergrid.corepersistence.results;
+ 
+ 
+ import java.util.Iterator;
+ import java.util.NoSuchElementException;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.usergrid.persistence.Results;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
++import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+ import org.apache.usergrid.persistence.index.EntityIndex;
+ import org.apache.usergrid.persistence.index.IndexScope;
+ import org.apache.usergrid.persistence.index.SearchTypes;
+ import org.apache.usergrid.persistence.index.query.CandidateResults;
+ import org.apache.usergrid.persistence.index.query.Query;
+ 
+ 
+ public class ElasticSearchQueryExecutor implements QueryExecutor {
+ 
+     private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
+ 
+     private final ResultsLoaderFactory resultsLoaderFactory;
+ 
+     private final ApplicationScope applicationScope;
+ 
 -    private final EntityIndex entityIndex;
++    private final ApplicationEntityIndex entityIndex;
+ 
+     private final IndexScope indexScope;
+ 
+     private final SearchTypes types;
+ 
+     private final Query query;
+ 
+ 
+     private Results currentResults;
+ 
+     private boolean moreToLoad = true;
+ 
+ 
 -    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final EntityIndex entityIndex,
++    public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
+                                        final ApplicationScope applicationScope, final IndexScope indexScope,
+                                        final SearchTypes types, final Query query ) {
+         this.resultsLoaderFactory = resultsLoaderFactory;
+         this.applicationScope = applicationScope;
+         this.entityIndex = entityIndex;
+         this.indexScope = indexScope;
+         this.types = types;
+ 
+         //we must deep copy the query passed.  Otherwise we will modify it's state with cursors.  Won't fix, not relevant
+         //once we start subscribing to streams.
+         this.query = new Query(query);
+     }
+ 
+ 
+     @Override
+     public Iterator<Results> iterator() {
+         return this;
+     }
+ 
+ 
+     private void loadNextPage() {
+         // Because of possible stale entities, which are filtered out by buildResults(),
+         // we loop until the we've got enough results to satisfy the query limit.
+ 
+         final int maxQueries = 10; // max re-queries to satisfy query limit
+ 
+         final int originalLimit = query.getLimit();
+ 
+         Results results = null;
+         int queryCount = 0;
+ 
+         boolean satisfied = false;
+ 
+ 
+         while ( !satisfied && queryCount++ < maxQueries ) {
+ 
+             CandidateResults crs = entityIndex.search( indexScope, types, query );
+ 
+             logger.debug( "Calling build results 1" );
+             results = buildResults( indexScope, query, crs );
+ 
+             if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
+                 satisfied = true;
+             }
+ 
+             /**
+              * In an edge case where we delete stale entities, we could potentially get less results than expected.
+              * This will only occur once during the repair phase.
+              * We need to ensure that we short circuit before we overflow the requested limit during a repair.
+              */
+             else if ( results.size() > 0 ) { // got what we need
+                 satisfied = true;
+             }
+             //we didn't load anything, but there was a cursor, this means a read repair occured.  We have to short
+             //circuit to avoid over returning the result set
+             else if ( crs.hasCursor() ) {
+                 satisfied = false;
+ 
+                 // need to query for more
+                 // ask for just what we need to satisfy, don't want to exceed limit
+                 query.setCursor( results.getCursor() );
+                 query.setLimit( originalLimit - results.size() );
+ 
+                 logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+                     originalLimit, query.getLimit(), queryCount
+                 } );
+             }
+         }
+ 
+         //now set our cursor if we have one for the next iteration
+         if ( results.hasCursor() ) {
+             query.setCursor( results.getCursor() );
+             moreToLoad = true;
+         }
+ 
+         else {
+             moreToLoad = false;
+         }
+ 
+ 
+         //set our current results and the flag
+         this.currentResults = results;
+     }
+ 
+ 
+     /**
+      * Build results from a set of candidates, and discard those that represent stale indexes.
+      *
+      * @param query Query that was executed
+      * @param crs Candidates to be considered for results
+      */
+     private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs ) {
+ 
+         logger.debug( "buildResults()  from {} candidates", crs.size() );
+ 
+         //get an instance of our results loader
+         final ResultsLoader resultsLoader =
+             this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() );
+ 
+         //load the results
+         final Results results = resultsLoader.loadResults( crs );
+ 
+         //signal for post processing
+         resultsLoader.postProcess();
+ 
+ 
+         results.setCursor( crs.getCursor() );
+ 
+         //ugly and tight coupling, but we don't have a choice until we finish some refactoring
+         results.setQueryExecutor( this );
+ 
+         logger.debug( "Returning results size {}", results.size() );
+ 
+         return results;
+     }
+ 
+ 
+     @Override
+     public boolean hasNext() {
+ 
+         //we've tried to load and it's empty and we have more to load, load the next page
+         if ( currentResults == null ) {
+             //there's nothing left to load, nothing to do
+             if ( !moreToLoad ) {
+                 return false;
+             }
+ 
+             //load the page
+ 
+             loadNextPage();
+         }
+ 
+ 
+         //see if our current results are not null
+         return currentResults != null;
+     }
+ 
+ 
+     @Override
+     public Results next() {
+         if ( !hasNext() ) {
+             throw new NoSuchElementException( "No more results present" );
+         }
+ 
+         final Results toReturn = currentResults;
+ 
+         currentResults = null;
+ 
+         return toReturn;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
index d588476,9c768bf..26395b4
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
@@@ -25,9 -27,7 +27,8 @@@ import org.apache.commons.lang.math.Num
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import org.apache.commons.lang.math.NumberUtils;
  
 +
  public class ListUtils extends org.apache.commons.collections.ListUtils {
      private static final Logger LOG = LoggerFactory.getLogger( ListUtils.class );