You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/22 20:21:15 UTC
[1/3] incubator-usergrid git commit: Removed inner observable and
converted it into regular code that functions. Will convert back in next
push.
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-608 ed317b295 -> 630cb4a87
Removed inner observable and converted it into regular code that functions. Will convert back in next push.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/11648ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/11648ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/11648ab1
Branch: refs/heads/USERGRID-608
Commit: 11648ab1c84be349a45020e0cc4a9cafd60d005b
Parents: ed317b2
Author: GERey <gr...@apigee.com>
Authored: Mon May 18 15:08:15 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Mon May 18 15:08:15 2015 -0700
----------------------------------------------------------------------
.../mvcc/stage/delete/UniqueCleanup.java | 106 ++++++++++++-------
1 file changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/11648ab1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 0034f03..3e3e531 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -88,44 +88,76 @@ public class UniqueCleanup
final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+ Iterator<UniqueValue> uniqueFields = uniqueValueSerializationStrategy.getAllUniqueFields(
+ applicationScope, entityId );
+
+ final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+ while(uniqueFields.hasNext()){
+ UniqueValue uniqueValue = uniqueFields.next();
+ final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+ //TODO: should this be equals? That way we clean up the one marked as well
+ if(UUIDComparator.staticCompare( entityVersion, uniqueValueVersion ) >= 0){
+ logger
+ .debug( "Deleting value:{} from application scope: {} ", uniqueValue, applicationScope );
+ uniqueCleanupBatch.mergeShallow(
+ uniqueValueSerializationStrategy.delete( applicationScope,uniqueValue ));
+
+ }
+ }
+
+ try {
+ uniqueCleanupBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute batch mutation", e );
+ }
+
+
+
//TODO Refactor this logic into a a class that can be invoked from anywhere
- //iterate all unique values
- final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
- Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
- @Override
- protected Iterator<UniqueValue> getIterator() {
- return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
- }
- } )
-
- //skip versions > the specified version
- .skipWhile( uniqueValue -> {
-
- final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
-
- return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
- } )
-
- //buffer our buffer size, then roll them all up in a single batch mutation
- .buffer( serializationFig.getBufferSize() )
-
- //roll them up
- .doOnNext( uniqueValues -> {
- final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
- for ( UniqueValue value : uniqueValues ) {
- uniqueCleanupBatch
- .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
- }
-
- try {
- uniqueCleanupBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute batch mutation", e );
- }
- } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
+// //iterate all unique values
+// final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
+// Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+// @Override
+// protected Iterator<UniqueValue> getIterator() {
+// return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
+// }
+// } )
+//
+// //skip versions > the specified version
+// //TODO: does this emit for every version before the staticComparator?
+// .skipWhile( uniqueValue -> {
+//
+// logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion );
+// final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+// //TODO: should this be equals? That way we clean up the one marked as well
+// return UUIDComparator.staticCompare( entityVersion,uniqueValueVersion ) > 0;
+// } )
+//
+// //buffer our buffer size, then roll them all up in a single batch mutation
+// .buffer( serializationFig.getBufferSize() )
+//
+// //roll them up
+// .doOnNext( uniqueValues -> {
+// final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+//
+//
+// for ( UniqueValue value : uniqueValues ) {
+// logger
+// .debug( "Deleting value:{} from application scope: {} ", value, applicationScope );
+// uniqueCleanupBatch
+// .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+// }
+//
+// try {
+// uniqueCleanupBatch.execute();
+// }
+// catch ( ConnectionException e ) {
+// throw new RuntimeException( "Unable to execute batch mutation", e );
+// }
+// } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
} );
return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
[3/3] incubator-usergrid git commit: [USERGRID-677] Added revised
test for the scrolling query. Added fixes to method so that it properly logs
aggregation and searching for versions.
Posted by gr...@apache.org.
[USERGRID-677] Added revised test for the scrolling query.
Added fixes to method so that it properly logs aggregation and searching for versions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/630cb4a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/630cb4a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/630cb4a8
Branch: refs/heads/USERGRID-608
Commit: 630cb4a87e0943d03b9d515166b1529b4cd649d6
Parents: cf823c4
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 11:21:09 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 11:21:09 2015 -0700
----------------------------------------------------------------------
.../impl/EsApplicationEntityIndexImpl.java | 43 +++++++++++------
.../persistence/index/impl/EntityIndexTest.java | 49 +++++++++++++++++++-
2 files changed, 75 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/630cb4a8/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index d4ed36a..ad47348 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -31,6 +31,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
@@ -227,16 +228,11 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
@Override
- public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit,
- final int offset ) {
+ public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion) {
- /**
- * Take a list of IndexEdge, with an entityId
- and query Es directly for matches
-
- */
Preconditions.checkNotNull( entityId, "entityId cannot be null" );
- Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+ //TODO: check to see if there is some version verifcation. I know there is but i forget where.
+ Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" );
SearchResponse searchResponse;
@@ -246,14 +242,21 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
- FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+ //I can't just search on the entity Id.
+ FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
IndexingUtils.idString( entityId ) );
- srb.setPostFilter( termFilter );
+
+ FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion );
+
+ //aggregate the filters into the and filder and feed that in.
+ FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter );
+
+ srb.setPostFilter(andFilter);
if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching for edge index (read alias): {}\n nodeId: {},\n query: {} ",
+ logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
this.alias.getReadAlias(),entityId, srb );
}
@@ -264,6 +267,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
//REfactor this out and have it return a modified parseResults that will create the candidateResults from
//the hit results and then keep that
//set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+ //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
+ //TODO: review this and make them not magic numbers when acking this PR.
searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
//list that will hold all of the search hits
@@ -273,9 +278,17 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
//add search result hits to some sort of running tally of hits.
candidates = aggregateScrollResults( candidates, searchResponse );
- searchResponse = searchRequestBuilderStrategyV2
+ SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
.getScrollBuilder( searchResponse.getScrollId() )
- .setScroll( new TimeValue( 6000 ) ).execute().actionGet();
+ .setScroll( new TimeValue( 6000 ) );
+
+ //TODO: figure out how to log exactly what we're putting into elasticsearch
+// if ( logger.isDebugEnabled() ) {
+// logger.debug( "Scroll search using query: {} ",
+// ssrb.toString() );
+// }
+
+ searchResponse = ssrb.execute().actionGet();
if (searchResponse.getHits().getHits().length == 0) {
break;
@@ -385,8 +398,6 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
final SearchHits searchHits = searchResponse.getHits();
final SearchHit[] hits = searchHits.getHits();
- logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
-
for ( SearchHit hit : hits ) {
final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
@@ -394,6 +405,8 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
candidates.add( candidateResult );
}
+ logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() );
+
return candidates;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/630cb4a8/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index df0c701..79fc14a 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -298,7 +298,7 @@ public class EntityIndexTest extends BaseIT {
insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 0 );
- ei.addIndex("v2", 1, 0, "one");
+ ei.addIndex( "v2", 1, 0, "one" );
ei.refreshAsync().toBlocking().first();
insertJsonBlob( entityIndex, entityType, searchEdge, "/sample-large.json", 1, 1 );
@@ -325,7 +325,7 @@ public class EntityIndexTest extends BaseIT {
batch.execute().toBlocking().last();
IndexRefreshCommandImpl.IndexRefreshCommandInfo info = ei.refreshAsync().toBlocking().first();
long time = info.getExecutionTime();
- log.info("refresh took ms:"+time);
+ log.info( "refresh took ms:" + time );
}
@@ -405,6 +405,51 @@ public class EntityIndexTest extends BaseIT {
}
+ /**
+ * Tests that we aggregate results only before the halfway version point.
+ */
+ @Test
+ public void testScollingDeindex() {
+
+ int numberOfEntities = 1000;
+ int versionToSearchFor = numberOfEntities / 2;
+
+ Id appId = new SimpleId( "application" );
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
+
+ IndexEdge searchEdge = new IndexEdgeImpl( appId, "mehCars", SearchEdge.NodeType.SOURCE, 1 );
+
+ ApplicationEntityIndex entityIndex = eif.createApplicationEntityIndex( applicationScope );
+
+ UUID entityUUID = UUID.randomUUID();
+ Id entityId = new SimpleId( "mehCar" );
+
+ Map entityMap = new HashMap() {{
+ put( "name", "Toyota Corolla" );
+ put( "introduced", 1966 );
+ put( "topspeed", 111 );
+ }};
+
+ Entity[] entity = new Entity[numberOfEntities];
+ for(int i = 0; i < numberOfEntities; i++) {
+ entity[i] = EntityIndexMapUtils.fromMap( entityMap );
+ EntityUtils.setId( entity[i], entityId );
+ EntityUtils.setVersion( entity[i], UUIDGenerator.newTimeUUID() );
+ entity[i].setField( new UUIDField( IndexingUtils.ENTITY_ID_FIELDNAME, entityUUID ) );
+
+ //index the new entity. This is where the loop will be set to create like 100 entities.
+ entityIndex.createBatch().index( searchEdge, entity[i] ).execute().toBlocking().last();
+ }
+ ei.refreshAsync().toBlocking().first();
+
+ CandidateResults candidateResults = entityIndex
+ .getAllEntityVersionBeforeMark( entity[versionToSearchFor].getId(), entity[versionToSearchFor].getVersion());
+ assertEquals( 501, candidateResults.size() );
+ }
+
+
+
private CandidateResults testQuery( final SearchEdge scope, final SearchTypes searchTypes,
final ApplicationEntityIndex entityIndex, final String queryString,
final int num ) {
[2/3] incubator-usergrid git commit: [USERGRID-677] Added a new
SearchRequestBuilderStrategy that allows you to make term post filters easily
without the cruft of the regular SearchRequestBuilderStrategy. Added a
version of the scrolling api and an addit
Posted by gr...@apache.org.
[USERGRID-677] Added a new SearchRequestBuilderStrategy that allows you to make term post filters easily without the cruft of the regular SearchRequestBuilderStrategy.
Added a version of the scrolling api and an additional parser that will let us aggregate the from the constant queries and return a single CandidateResults set.
Added a test that currently just checks to make sure the new scrolling search still functions as a regular search ( it does! Yay! ).
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf823c47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf823c47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf823c47
Branch: refs/heads/USERGRID-608
Commit: cf823c47d3fc2bb92198e7d1e65ecc4be96ccfcc
Parents: 11648ab
Author: GERey <gr...@apigee.com>
Authored: Fri May 22 09:53:44 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri May 22 09:53:44 2015 -0700
----------------------------------------------------------------------
.../SearchRequestBuilderStrategyV2.java | 69 ++++++++++++++
.../impl/EsApplicationEntityIndexImpl.java | 98 ++++++++++++++++++--
2 files changed, 160 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
new file mode 100644
index 0000000..0511b75
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder;
+
+
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
+import org.elasticsearch.action.search.SearchType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.impl.EsProvider;
+import org.apache.usergrid.persistence.index.impl.IndexAlias;
+import org.apache.usergrid.persistence.index.impl.IndexingUtils;
+
+
+/**
+ * The class to use when creating new custom queries for specific terms to elasticsearch.
+ */
+public class SearchRequestBuilderStrategyV2 {
+ private static final Logger logger = LoggerFactory.getLogger( SearchRequestBuilderStrategyV2.class );
+
+ private final EsProvider esProvider;
+ private final ApplicationScope applicationScope;
+ private final IndexAlias alias;
+ private final int cursorTimeout;
+
+
+ public SearchRequestBuilderStrategyV2( final EsProvider esProvider, final ApplicationScope applicationScope,
+ final IndexAlias alias, int cursorTimeout ) {
+
+ this.esProvider = esProvider;
+ this.applicationScope = applicationScope;
+ this.alias = alias;
+ this.cursorTimeout = cursorTimeout;
+ }
+
+ public SearchRequestBuilder getBuilder(){
+ SearchRequestBuilder srb =
+ esProvider.getClient().prepareSearch( alias.getReadAlias() ).setTypes( IndexingUtils.ES_ENTITY_TYPE ).setSearchType(
+ SearchType.QUERY_THEN_FETCH);
+
+
+ return srb;
+ }
+
+ public SearchScrollRequestBuilder getScrollBuilder(String scrollId){
+ return esProvider.getClient().prepareSearchScroll( scrollId );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf823c47/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
index 2709569..d4ed36a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
@@ -32,6 +32,7 @@ import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@@ -41,8 +42,6 @@ import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
-
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -50,6 +49,7 @@ import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.CandidateResult;
import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -96,6 +96,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
private final Timer deleteApplicationTimer;
private final Meter deleteApplicationMeter;
private final SearchRequestBuilderStrategy searchRequest;
+ private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
private FailureMonitor failureMonitor;
private final int cursorTimeout;
private final long queryTimeout;
@@ -119,7 +120,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
this.esProvider = provider;
mapManager = mapManagerFactory.createMapManager( mapScope );
- this.searchTimer = metricsFactory.getTimer(EsApplicationEntityIndexImpl.class, "search.timer");
+ this.searchTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.timer" );
this.cursorTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.cursor.timer" );
this.cursorTimeout = config.getQueryCursorTimeout();
this.queryTimeout = config.getWriteTimeout();
@@ -132,6 +133,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
this.alias = indexIdentifier.getAlias();
this.searchRequest = new SearchRequestBuilderStrategy( esProvider, appScope, alias, cursorTimeout );
+ this.searchRequestBuilderStrategyV2 = new SearchRequestBuilderStrategyV2( esProvider, appScope, alias, cursorTimeout );
}
@@ -156,7 +158,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query );
final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
- .setTimeout( TimeValue.timeValueMillis(queryTimeout) );
+ .setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
@@ -204,7 +206,7 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
if ( logger.isDebugEnabled() ) {
logger.debug( "Searching for edge index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
this.alias.getReadAlias(), edge.getNodeId(), edge.getEdgeName(),
- SearchTypes.fromTypes( entityId.getType()), srb );
+ SearchTypes.fromTypes( entityId.getType() ), srb );
}
try {
@@ -225,9 +227,72 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
@Override
- public CandidateResults getAllEntityVersionBeforeMark( final Id entityId, final UUID markedVersion, final int limit,
+ public CandidateResults getAllEntityVersionBeforeMark( final Id entityId , final UUID markedVersion, final int limit,
final int offset ) {
- throw new NotImplementedException( "Implement me or else I won't work." );
+
+ /**
+ * Take a list of IndexEdge, with an entityId
+ and query Es directly for matches
+
+ */
+ Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+ Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+ SearchResponse searchResponse;
+
+ List<CandidateResult> candidates = new ArrayList<>();
+
+ final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+ FilterBuilder termFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+ IndexingUtils.idString( entityId ) );
+ srb.setPostFilter( termFilter );
+
+
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Searching for edge index (read alias): {}\n nodeId: {},\n query: {} ",
+ this.alias.getReadAlias(),entityId, srb );
+ }
+
+ try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearch = searchTimer.time();
+
+ //REfactor this out and have it return a modified parseResults that will create the candidateResults from
+ //the hit results and then keep that
+ //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+ searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+
+ //list that will hold all of the search hits
+
+
+ while(true){
+ //add search result hits to some sort of running tally of hits.
+ candidates = aggregateScrollResults( candidates, searchResponse );
+
+ searchResponse = searchRequestBuilderStrategyV2
+ .getScrollBuilder( searchResponse.getScrollId() )
+ .setScroll( new TimeValue( 6000 ) ).execute().actionGet();
+
+ if (searchResponse.getHits().getHits().length == 0) {
+ break;
+ }
+
+
+ }
+ timeSearch.stop();
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to communicate with Elasticsearch", t );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+ failureMonitor.success();
+
+ return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
}
@@ -314,4 +379,23 @@ public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
return candidateResults;
}
+ private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
+ final SearchResponse searchResponse ){
+
+ final SearchHits searchHits = searchResponse.getHits();
+ final SearchHit[] hits = searchHits.getHits();
+
+ logger.debug( "Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
+
+ for ( SearchHit hit : hits ) {
+
+ final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
+
+ candidates.add( candidateResult );
+ }
+
+ return candidates;
+
+ }
+
}