You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/26 21:23:29 UTC
[04/10] incubator-usergrid git commit: Merge branch 'two-dot-o' into
two-dot-o-dev
Merge branch 'two-dot-o' into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6db7ce93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6db7ce93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6db7ce93
Branch: refs/heads/USERGRID-501
Commit: 6db7ce937b008b26f3f06e9a6fbe295b4b8d7dc3
Parents: 66fdc61 beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:31:07 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:31:31 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 304 ++++++-------------
.../results/CollectionRefsVerifier.java | 44 +++
.../CollectionResultsLoaderFactoryImpl.java | 60 ++++
.../results/ConnectionRefsVerifier.java | 61 ++++
.../ConnectionResultsLoaderFactoryImpl.java | 65 ++++
.../results/ElasticSearchQueryExecutor.java | 212 +++++++++++++
.../corepersistence/results/QueryExecutor.java | 37 +++
.../corepersistence/results/RefsVerifier.java | 42 ---
.../results/ResultsLoaderFactory.java | 3 +-
.../results/ResultsLoaderFactoryImpl.java | 62 ----
.../persistence/MultiQueryIterator.java | 6 +-
.../apache/usergrid/persistence/Results.java | 26 +-
.../cassandra/QueryProcessorImpl.java | 12 +-
.../usergrid/persistence/PathQueryIT.java | 46 ++-
stack/core/src/test/resources/log4j.properties | 2 +
.../persistence/graph/GraphManagerIT.java | 2 +-
.../usergrid/persistence/index/IndexFig.java | 16 +-
.../index/impl/BufferQueueInMemoryImpl.java | 10 +-
.../index/impl/EsIndexBufferConsumerImpl.java | 61 ++--
.../usergrid/persistence/index/query/Query.java | 58 ++--
.../persistence/index/utils/ListUtils.java | 3 +-
.../cassandra/ManagementServiceImpl.java | 18 +-
.../providers/PingIdentityProvider.java | 5 +-
23 files changed, 731 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index f505fa3,da39ea9..5416f19
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@@ -76,9 -76,11 +78,9 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.index.EntityIndex;
++import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+ import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.SearchTypes;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
- import org.apache.usergrid.persistence.index.query.CandidateResult;
- import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.Query.Level;
@@@ -275,9 -275,9 +275,9 @@@ public class CpRelationManager implemen
});
Observable<String> types= gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ));
+ new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeTypePrefix, null ) );
- Iterator<String> iter = types.toBlockingObservable().getIterator();
+ Iterator<String> iter = types.toBlocking().getIterator();
while ( iter.hasNext() ) {
indexes.add( iter.next() );
}
@@@ -317,59 -318,51 +318,47 @@@
* @param edgeType Edge type, edge type prefix or null to allow any edge type
* @param fromEntityType Only consider edges from entities of this type
*/
- Map<EntityRef, Set<String>> getContainers( int limit, String edgeType, String fromEntityType ) {
-
- Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
-
- GraphManager gm = managerCache.getGraphManager(applicationScope);
-
- Iterator<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType(
- cpHeadEntity.getId(), edgeType, null) ).toBlocking().getIterator();
-
- logger.debug("getContainers(): "
- + "Searched for edges of type {}\n to target {}:{}\n in scope {}\n found: {}",
- new Object[] {
- edgeType,
- cpHeadEntity.getId().getType(),
- cpHeadEntity.getId().getUuid(),
- applicationScope.getApplication(),
- edgeTypes.hasNext()
- });
+ Map<EntityRef, Set<String>> getContainers( final int limit, final String edgeType, final String fromEntityType ) {
- while ( edgeTypes.hasNext() ) {
- Map<EntityRef, Set<String>> results = new LinkedHashMap<EntityRef, Set<String>>();
--
- String etype = edgeTypes.next();
+ final GraphManager gm = managerCache.getGraphManager( applicationScope );
- Observable<Edge> edges = gm.loadEdgesToTarget( new SimpleSearchByEdgeType(
- cpHeadEntity.getId(), etype, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, null ));
+ Observable<Edge> edges =
+ gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( cpHeadEntity.getId(), edgeType, null ) )
+ .flatMap( new Func1<String, Observable<Edge>>() {
+ @Override
+ public Observable<Edge> call( final String edgeType ) {
+ return gm.loadEdgesToTarget(
+ new SimpleSearchByEdgeType( cpHeadEntity.getId(), edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ) );
+
+ }
+ } );
+
+ //if our limit is set, take them. Note this logic is still borked, we can't possibly fit everything in memmory
+ if ( limit > -1 ) {
+ edges = edges.take( limit );
+ }
- Iterator<Edge> iter = edges.toBlocking().getIterator();
- while ( iter.hasNext() ) {
- Edge edge = iter.next();
- if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() )) {
- logger.debug("Ignoring edge from entity type {}", edge.getSourceNode().getType());
- continue;
- return edges.collect( results, new Action2<Map<EntityRef, Set<String>>, Edge>() {
- @Override
- public void call( final Map<EntityRef, Set<String>> entityRefSetMap, final Edge edge ) {
++ return edges.collect( () -> new LinkedHashMap<EntityRef, Set<String>>(), ( entityRefSetMap, edge) -> {
+ if ( fromEntityType != null && !fromEntityType.equals( edge.getSourceNode().getType() ) ) {
+ logger.debug( "Ignoring edge from entity type {}", edge.getSourceNode().getType() );
+ return;
}
- EntityRef eref = new SimpleEntityRef(
- edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
+ final EntityRef eref =
+ new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
- String name = null;
- if ( CpNamingUtils.isConnectionEdgeType( edge.getType() )) {
+ String name;
+ if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
name = CpNamingUtils.getConnectionType( edge.getType() );
- } else {
+ }
+ else {
name = CpNamingUtils.getCollectionName( edge.getType() );
}
- addMapSet( results, eref, name );
+ addMapSet( entityRefSetMap, eref, name );
}
-
- if ( limit > 0 && results.keySet().size() >= limit ) {
- break;
- }
- }
-
- return results;
- } ).toBlocking().last();
++ ).toBlocking().last();
}
@@@ -689,18 -679,16 +675,16 @@@
// create graph edge connection from head entity to member entity
Edge edge = new SimpleEdge( cpHeadEntity.getId(), edgeType, memberEntity.getId(), uuidHash );
GraphManager gm = managerCache.getGraphManager( applicationScope );
- gm.writeEdge( edge ).toBlockingObservable().last();
+ gm.writeEdge( edge ).toBlocking().last();
- logger.debug( "Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}",
- new Object[] {
- edgeType,
- cpHeadEntity.getId().getType(),
- cpHeadEntity.getId().getUuid(),
- memberEntity.getId().getType(),
- memberEntity.getId().getUuid(),
- applicationScope.getApplication().getType(),
+
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Wrote edgeType {}\n from {}:{}\n to {}:{}\n scope {}:{}", new Object[] {
+ edgeType, cpHeadEntity.getId().getType(), cpHeadEntity.getId().getUuid(), memberEntity.getId().getType(),
+ memberEntity.getId().getUuid(), applicationScope.getApplication().getType(),
applicationScope.getApplication().getUuid()
- } );
+ } );
+ }
( ( CpEntityManager ) em ).indexEntityIntoCollection( cpHeadEntity, memberEntity, collName );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
index 0000000,535af36..fea21ae
mode 000000,100644..100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ElasticSearchQueryExecutor.java
@@@ -1,0 -1,211 +1,212 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ package org.apache.usergrid.corepersistence.results;
+
+
+ import java.util.Iterator;
+ import java.util.NoSuchElementException;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import org.apache.usergrid.persistence.Results;
+ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
++import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+ import org.apache.usergrid.persistence.index.EntityIndex;
+ import org.apache.usergrid.persistence.index.IndexScope;
+ import org.apache.usergrid.persistence.index.SearchTypes;
+ import org.apache.usergrid.persistence.index.query.CandidateResults;
+ import org.apache.usergrid.persistence.index.query.Query;
+
+
+ public class ElasticSearchQueryExecutor implements QueryExecutor {
+
+ private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
+
+ private final ResultsLoaderFactory resultsLoaderFactory;
+
+ private final ApplicationScope applicationScope;
+
- private final EntityIndex entityIndex;
++ private final ApplicationEntityIndex entityIndex;
+
+ private final IndexScope indexScope;
+
+ private final SearchTypes types;
+
+ private final Query query;
+
+
+ private Results currentResults;
+
+ private boolean moreToLoad = true;
+
+
- public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final EntityIndex entityIndex,
++ public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
+ final ApplicationScope applicationScope, final IndexScope indexScope,
+ final SearchTypes types, final Query query ) {
+ this.resultsLoaderFactory = resultsLoaderFactory;
+ this.applicationScope = applicationScope;
+ this.entityIndex = entityIndex;
+ this.indexScope = indexScope;
+ this.types = types;
+
+ //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant
+ //once we start subscribing to streams.
+ this.query = new Query(query);
+ }
+
+
+ @Override
+ public Iterator<Results> iterator() {
+ return this;
+ }
+
+
+ private void loadNextPage() {
+ // Because of possible stale entities, which are filtered out by buildResults(),
+ // we loop until the we've got enough results to satisfy the query limit.
+
+ final int maxQueries = 10; // max re-queries to satisfy query limit
+
+ final int originalLimit = query.getLimit();
+
+ Results results = null;
+ int queryCount = 0;
+
+ boolean satisfied = false;
+
+
+ while ( !satisfied && queryCount++ < maxQueries ) {
+
+ CandidateResults crs = entityIndex.search( indexScope, types, query );
+
+ logger.debug( "Calling build results 1" );
+ results = buildResults( indexScope, query, crs );
+
+ if ( crs.isEmpty() || !crs.hasCursor() ) { // no results, no cursor, can't get more
+ satisfied = true;
+ }
+
+ /**
+ * In an edge case where we delete stale entities, we could potentially get less results than expected.
+ * This will only occur once during the repair phase.
+ * We need to ensure that we short circuit before we overflow the requested limit during a repair.
+ */
+ else if ( results.size() > 0 ) { // got what we need
+ satisfied = true;
+ }
+ //we didn't load anything, but there was a cursor, this means a read repair occured. We have to short
+ //circuit to avoid over returning the result set
+ else if ( crs.hasCursor() ) {
+ satisfied = false;
+
+ // need to query for more
+ // ask for just what we need to satisfy, don't want to exceed limit
+ query.setCursor( results.getCursor() );
+ query.setLimit( originalLimit - results.size() );
+
+ logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
+ originalLimit, query.getLimit(), queryCount
+ } );
+ }
+ }
+
+ //now set our cursor if we have one for the next iteration
+ if ( results.hasCursor() ) {
+ query.setCursor( results.getCursor() );
+ moreToLoad = true;
+ }
+
+ else {
+ moreToLoad = false;
+ }
+
+
+ //set our current results and the flag
+ this.currentResults = results;
+ }
+
+
+ /**
+ * Build results from a set of candidates, and discard those that represent stale indexes.
+ *
+ * @param query Query that was executed
+ * @param crs Candidates to be considered for results
+ */
+ private Results buildResults( final IndexScope indexScope, final Query query, final CandidateResults crs ) {
+
+ logger.debug( "buildResults() from {} candidates", crs.size() );
+
+ //get an instance of our results loader
+ final ResultsLoader resultsLoader =
+ this.resultsLoaderFactory.getLoader( applicationScope, indexScope, query.getResultsLevel() );
+
+ //load the results
+ final Results results = resultsLoader.loadResults( crs );
+
+ //signal for post processing
+ resultsLoader.postProcess();
+
+
+ results.setCursor( crs.getCursor() );
+
+ //ugly and tight coupling, but we don't have a choice until we finish some refactoring
+ results.setQueryExecutor( this );
+
+ logger.debug( "Returning results size {}", results.size() );
+
+ return results;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+
+ //we've tried to load and it's empty and we have more to load, load the next page
+ if ( currentResults == null ) {
+ //there's nothing left to load, nothing to do
+ if ( !moreToLoad ) {
+ return false;
+ }
+
+ //load the page
+
+ loadNextPage();
+ }
+
+
+ //see if our current results are not null
+ return currentResults != null;
+ }
+
+
+ @Override
+ public Results next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more results present" );
+ }
+
+ final Results toReturn = currentResults;
+
+ currentResults = null;
+
+ return toReturn;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/core/src/test/java/org/apache/usergrid/persistence/PathQueryIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6db7ce93/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
index d588476,9c768bf..26395b4
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/utils/ListUtils.java
@@@ -25,9 -27,7 +27,8 @@@ import org.apache.commons.lang.math.Num
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import org.apache.commons.lang.math.NumberUtils;
+
public class ListUtils extends org.apache.commons.collections.ListUtils {
private static final Logger LOG = LoggerFactory.getLogger( ListUtils.class );