You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/27 19:28:44 UTC
[06/19] incubator-usergrid git commit: [USERGRID-677] Added a new
SearchRequestBuilderStrategy that allows you to make term post filters easily
without the cruft of the regular SearchRequestBuilderStrategy. Added a
version of the scrolling api and an add
[USERGRID-677] Added a new SearchRequestBuilderStrategy that allows you to make term post filters easily without the cruft of the regular SearchRequestBuilderStrategy.
Added a version of the scrolling api and an additional parser that will let us aggregate the from the constant queries and return a single CandidateResults set.
Added a test that currently just checks to make sure the new scrolling search still functions as a regular search ( it does! Yay! ).
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf823c47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf823c47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf823c47
Branch: refs/heads/two-dot-o-dev
Commit: cf823c47d3fc2bb92198e7d1e65ecc4be96ccfcc
Parents: 11648ab
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 09:53:44 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 09:53:44 2015 -0700
----------------------------------------------------------------------
.../SearchRequestBuilderStrategyV2.java | 69 ++++++++++++++
.../impl/EsApplicationEntityIndexImpl.java | 98 ++++++++++++++++++--
2 files changed, 160 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
new file mode 100644
index 0000000..0511b75
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder;
+
+
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
+import org.elasticsearch.action.search.SearchType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.index.impl.IndexAlias;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
+
+
+/**
+ * The class to use when creating new custom queries for specific terms to elasticsearch.
+ */
+public class SearchRequestBuilderStrategyV2 {
+ private static final Logger logger = LoggerFactory.getLogger( SearchRequestBuilderStrategyV2.class );
+
+ private final EsProvider esProvider;
+ private final ApplicationScope applicationScope;
+ private final IndexAlias alias;
+ private final int cursorTimeout;
+
+
+ public SearchRequestBuilderStrategyV2( final EsProvider esProvider, final ApplicationScope applicationScope,
+ final IndexAlias alias, int cursorTimeout ) {
+
+ this.esProvider = esProvider;
+ this.applicationScope = applicationScope;
+ this.alias = alias;
+ this.cursorTimeout = cursorTimeout;
+ }
+
+ public SearchRequestBuilder getBuilder(){
+ SearchRequestBuilder srb =
+ esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes( IndexingUtils.ES_ENTITY_TYPE ).setSearchType(
+ SearchType.QUERY_THEN_FETCH);
+
+
+ return srb;
+ }
+
+ public SearchScrollRequestBuilder getScrollBuilder(String scrollId){
+ return esProvider.getClient().prepareSearchScroll( scrollId );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2709569..d4ed36a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -32,6 +32,7 @@ import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@@ -41,8 +42,6 @@ import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
-
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -50,6 +49,7 @@ import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.CandidateResult;
import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -96,6 +96,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
private final Timer deleteApplicationTimer;
private final Meter deleteApplicationMeter;
private final SearchRequestBuilderStrategy searchRequest;
+ private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
private FailureMonitor failureMonitor;
private final int cursorTimeout;
private final long queryTimeout;
@@ -119,7 +120,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
this.esProvider = provider;
mapManager = mapManagerFactory.createMapManager( mapScope );
- this.searchTimer = metricsFactory.getTimer(EsApplicationEntityIndexImpl.class, "search.timer");
+ this.searchTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.timer" );
this.cursorTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.cursor.timer" );
this.cursorTimeout = config.getQueryCursorTimeout();
this.queryTimeout = config.getWriteTimeout();
@@ -132,6 +133,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
this.alias = indexIdentifier.getAlias();
this.searchRequest = new SearchRequestBuilderStrategy( esProvider, appScope, alias, cursorTimeout );
+ this.searchRequestBuilderStrategyV2 = new SearchRequestBuilderStrategyV2( esProvider, appScope, alias, cursorTimeout );
}
@@ -156,7 +158,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query );
final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
- .setTimeout( TimeValue.timeValueMillis(queryTimeout) );
+ .setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
@@ -204,7 +206,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching for edge index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(),
- SearchTypes.fromTypes( entityId.getType()), srb );
+ SearchTypes.fromTypes( entityId.getType() ), srb );
}
try {
@@ -225,9 +227,72 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
@Override
- public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit,
+ public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit,
final int offset ) {
- throw new NotImplementedException( "Implement me or else I won't work." );
+
+ /**
+ * Take a list of IndexEdge, with an entityId
+ and query Es directly for matches
+
+ */
+ Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+ Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+ SearchResponse searchResponse;
+
+ List<CandidateResult> candidates = new ArrayList<>();
+
+ final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+ FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+ IndexingUtils.idString( entityId ) );
+ srb.setPostFilter( termFilter );
+
+
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Searching for edge index (read alias): {}\n nodeId: {},\n query: {} ",
+ this.alias.getReadAlias(),entityId, srb );
+ }
+
+ try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearch = searchTimer.time();
+
+ //REfactor this out and have it return a modified parseResults that will create the candidateResults from
+ //the hit results and then keep that
+ //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+ searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+
+ //list that will hold all of the search hits
+
+
+ while(true){
+ //add search result hits to some sort of running tally of hits.
+ candidates = aggregateScrollResults( candidates, searchResponse );
+
+ searchResponse = searchRequestBuilderStrategyV2
+ .getScrollBuilder( searchResponse.getScrollId() )
+ .setScroll( new TimeValue( 6000 ) ).execute().actionGet();
+
+ if (searchResponse.getHits().getHits().length == 0) {
+ break;
+ }
+
+
+ }
+ timeSearch.stop();
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to communicate with Elasticsearch", t );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+ failureMonitor.success();
+
+ return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
}
@@ -314,4 +379,23 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
return candidateResults;
}
+ private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
+ final SearchResponse searchResponse ){
+
+ final SearchHits searchHits = searchResponse.getHits();
+ final SearchHit[] hits = searchHits.getHits();
+
+ logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
+
+ for ( SearchHit hit : hits ) {
+
+ final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
+
+ candidates.add( candidateResult );
+ }
+
+ return candidates;
+
+ }
+
}