You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/28 00:15:59 UTC
[04/10] usergrid git commit: add aggregation service
add aggregation service
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/198f4891
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/198f4891
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/198f4891
Branch: refs/heads/two-dot-o-dev
Commit: 198f489155b3f9d4346d62c0fe7ba859be917b70
Parents: c47b02d
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Aug 26 09:36:32 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Aug 26 09:36:32 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/index/EntityIndex.java | 61 ++++++---
.../index/impl/EsEntityIndexImpl.java | 51 +++++++-
.../persistence/index/impl/EntityIndexTest.java | 129 ++++++++++++++++---
3 files changed, 193 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 7fa2f07..6d563ff 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -39,34 +39,47 @@ public interface EntityIndex extends CPManager {
/**
* Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
- * @param indexSuffix index name
+ *
+ * @param indexSuffix index name
* @param shards
* @param replicas
* @param writeConsistency
*/
- void addIndex(
- final String indexSuffix,
- final int shards,
- final int replicas,
- final String writeConsistency
- );
+ void addIndex(
+ final String indexSuffix,
+ final int shards,
+ final int replicas,
+ final String writeConsistency
+ );
/**
* Refresh the index.
*/
- Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync();
+ Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync();
/**
* Check health of cluster.
*/
- Health getClusterHealth();
+ Health getClusterHealth();
/**
* Check health of this specific index.
*/
- Health getIndexHealth();
+ Health getIndexHealth();
+ /**
+ * get total entity size
+ * @return
+ */
+ long getEntitySize();
+
+ /**
+ * get total entity size by an edge -> "term":{"edgeName":"zzzcollzzz|roles"}
+ * @param edge
+ * @return
+ */
+ long getEntitySize(final String edge);
/**
* Initialize the index if necessary. This is an idempotent operation and should not create an index
@@ -82,48 +95,54 @@ public interface EntityIndex extends CPManager {
/**
* Search on every document in the specified search edge. Also search by the types if specified
- * @param searchEdge The edge to search on
+ *
+ * @param searchEdge The edge to search on
* @param searchTypes The search types to search
- * @param query The query to execute
- * @param limit The limit of values to return
- * @param offset The offset to query on
+ * @param query The query to execute
+ * @param limit The limit of values to return
+ * @param offset The offset to query on
* @return
*/
- CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
- final int limit, final int offset );
+ CandidateResults search(final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
+ final int limit, final int offset);
/**
* Same as search, just iterates all documents that match the index edge exactly.
- * @param edge The edge to search on
+ *
+ * @param edge The edge to search on
* @param entityId The entity that the searchEdge is connected to.
* @return
*/
- CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId );
+ CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
/**
* Returns all entity documents that match the entityId and come before the marked version
- * @param entityId The entityId to match when searching
+ *
+ * @param entityId The entityId to match when searching
* @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
* @return
*/
- CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion );
+ CandidateResults getAllEntityVersionsBeforeMarkedVersion(final Id entityId, final UUID markedVersion);
/**
* delete all application records
+ *
* @return
*/
Observable deleteApplication();
/**
* Get the indexes for an alias
+ *
* @param aliasType name of alias
* @return list of index names
*/
- String[] getIndexes( final AliasType aliasType );
+ String[] getIndexes(final AliasType aliasType);
/**
* get all unique indexes
+ *
* @return
*/
String[] getIndexes();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 87e2dbd..800dac3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -62,6 +62,10 @@ import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -424,7 +428,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
and query Es directly for matches
*/
- IndexValidationUtils.validateSearchEdge( edge );
+ IndexValidationUtils.validateSearchEdge(edge);
Preconditions.checkNotNull( entityId, "entityId cannot be null" );
SearchResponse searchResponse;
@@ -505,12 +509,12 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
- FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
- IndexingUtils.entityId( entityId ) );
+ FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
+ IndexingUtils.entityId(entityId));
- FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion );
+ FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte(markedVersion);
- FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter );
+ FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter, entityVersionFilter);
srb.setPostFilter(andFilter);
@@ -570,7 +574,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
* Completely delete an index.
*/
public Observable deleteApplication() {
- String idString = applicationId( applicationScope.getApplication() );
+ String idString = applicationId(applicationScope.getApplication());
final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
final String[] indexes = getIndexes();
//Added For Graphite Metrics
@@ -734,6 +738,41 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
}
@Override
+ public long getEntitySize(){
+
+
+ SearchRequestBuilder builder = searchRequestBuilderStrategyV2.getBuilder();
+ return getEntitySizeAggregation(builder);
+ }
+
+
+ @Override
+ public long getEntitySize(final String edge){
+ //"term":{"edgeName":"zzzcollzzz|roles"}
+ SearchRequestBuilder builder = searchRequestBuilderStrategyV2.getBuilder();
+ builder.setQuery(new TermQueryBuilder("edgeName",edge));
+ return getEntitySizeAggregation(builder);
+ }
+
+ private long getEntitySizeAggregation( SearchRequestBuilder builder) {
+ final String key = "entitySize";
+ SumBuilder sumBuilder = new SumBuilder(key);
+ sumBuilder.field("entitySize");
+ builder.addAggregation(sumBuilder);
+ Observable<Number> o = Observable.from(builder.execute())
+ .map(response -> {
+ Sum aggregation = (Sum) response.getAggregations().get(key);
+ if(aggregation == null){
+ return -1;
+ }else{
+ return aggregation.getValue();
+ }
+ });
+ Number val = o.toBlocking().first();
+ return val.longValue();
+ }
+
+ @Override
public int getImplementationVersion() {
return IndexDataVersions.SINGLE_INDEX.getVersion();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 9154382..564e5e7 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -270,7 +270,7 @@ public class EntityIndexTest extends BaseIT {
insertJsonBlob( entityType, searchEdge, "/sample-large.json", 1, 0 );
- entityIndex.addIndex(UUID.randomUUID()+ "v2", 1, 0, "one" );
+ entityIndex.addIndex(UUID.randomUUID() + "v2", 1, 0, "one");
entityIndex.refreshAsync().toBlocking().first();
insertJsonBlob( entityType, searchEdge, "/sample-large.json", 1, 1 );
@@ -278,12 +278,12 @@ public class EntityIndexTest extends BaseIT {
CandidateResults crs = testQuery( searchEdge, searchTypes, "name = 'Bowers Oneil'", 1 );
EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
- entityIndexBatch.deindex( searchEdge, crs.get( 0 ) );
+ entityIndexBatch.deindex(searchEdge, crs.get(0));
entityIndexBatch.execute().toBlocking().last();
entityIndex.refreshAsync().toBlocking().first();
//Hilda Youn
- testQuery( searchEdge, searchTypes, "name = 'Bowers Oneil'", 0 );
+ testQuery(searchEdge, searchTypes, "name = 'Bowers Oneil'", 0);
}
@@ -291,13 +291,14 @@ public class EntityIndexTest extends BaseIT {
String filePath, final int max, final int startIndex ) throws IOException {
InputStream is = this.getClass().getResourceAsStream( filePath );
ObjectMapper mapper = new ObjectMapper();
- List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
+ List<Object> sampleJson = mapper.readValue(is, new TypeReference<List<Object>>() {
+ });
EntityIndexBatch batch = entityIndex.createBatch();
- insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, startIndex );
+ insertJsonBlob(sampleJson, batch, entityType, indexEdge, max, startIndex);
batch.execute().toBlocking().last();
IndexRefreshCommandImpl.IndexRefreshCommandInfo info = entityIndex.refreshAsync().toBlocking().first();
long time = info.getExecutionTime();
- log.info( "refresh took ms:" + time );
+ log.info("refresh took ms:" + time);
}
@@ -366,7 +367,7 @@ public class EntityIndexTest extends BaseIT {
candidateResults = entityIndex
.search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 );
- assertEquals( 0, candidateResults.size() );
+ assertEquals(0, candidateResults.size());
}
@@ -420,8 +421,8 @@ public class EntityIndexTest extends BaseIT {
timer.stop();
- assertEquals( num, candidateResults.size() );
- log.debug( "Query time {}ms", timer.getTime() );
+ assertEquals(num, candidateResults.size());
+ log.debug("Query time {}ms", timer.getTime());
return candidateResults;
}
@@ -609,13 +610,13 @@ public class EntityIndexTest extends BaseIT {
assertEquals( bill.getId(), r.get( 0 ).getId() );
r = entityIndex.search( indexScope, searchTypes, "where username = 'fred'", 10, 0);
- assertEquals( fred.getId(), r.get( 0 ).getId() );
+ assertEquals(fred.getId(), r.get(0).getId());
r = entityIndex.search( indexScope, searchTypes, "where age = 41", 10, 0);
- assertEquals( fred.getId(), r.get( 0 ).getId() );
+ assertEquals(fred.getId(), r.get(0).getId());
r = entityIndex.search( indexScope, searchTypes, "where age = 'thirtysomething'", 10, 0);
- assertEquals( bill.getId(), r.get( 0 ).getId() );
+ assertEquals(bill.getId(), r.get(0).getId());
}
@@ -749,8 +750,8 @@ public class EntityIndexTest extends BaseIT {
final String query = "where searchUUID = " + searchUUID;
final CandidateResults r =
- entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0);
- assertEquals( user.getId(), r.get( 0 ).getId() );
+ entityIndex.search( indexSCope, SearchTypes.fromTypes(entityId.getType()), query, 10, 0);
+ assertEquals(user.getId(), r.get(0).getId());
}
@@ -781,7 +782,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
- batch.index( indexSCope, user );
+ batch.index(indexSCope, user);
batch.execute().toBlocking().last();
entityIndex.refreshAsync().toBlocking().first();
@@ -790,7 +791,7 @@ public class EntityIndexTest extends BaseIT {
final CandidateResults r =
entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0);
- assertEquals(user.getId(), r.get(0).getId() );
+ assertEquals(user.getId(), r.get(0).getId());
//shouldn't match
final String queryNoWildCard = "where string = 'I am'";
@@ -830,7 +831,7 @@ public class EntityIndexTest extends BaseIT {
final Entity second = new Entity( "search" );
- second.setField( new StringField( "string", "bravo long string" ) );
+ second.setField(new StringField("string", "bravo long string"));
EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() );
@@ -897,11 +898,11 @@ public class EntityIndexTest extends BaseIT {
//get ordering, so 2 is before 1 when both match
IndexEdge indexScope1 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 10 );
- batch.index( indexScope1, first );
+ batch.index(indexScope1, first);
IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 11 );
- batch.index( indexScope2, second);
+ batch.index(indexScope2, second);
batch.execute().toBlocking().last();
@@ -914,8 +915,8 @@ public class EntityIndexTest extends BaseIT {
entityIndex.search(indexScope1, SearchTypes.fromTypes( first.getId().getType() ), singleMatchQuery, 10, 0 );
- assertEquals( 1, singleResults.size() );
- assertEquals( first.getId(), singleResults.get( 0 ).getId() );
+ assertEquals(1, singleResults.size());
+ assertEquals(first.getId(), singleResults.get(0).getId());
//search in reversed
@@ -1213,6 +1214,92 @@ public class EntityIndexTest extends BaseIT {
assertEquals( 0, noMatchesContainsOrResults.size() );
}
+ @Test
+ public void testSize(){
+ final String type = UUID.randomUUID().toString();
+
+ Id ownerId = new SimpleId( type );
+
+
+ final Entity first = new Entity( "search" );
+
+ first.setField( new StringField( "string", "I ate a sammich" ) );
+ first.setSize(100);
+
+ EntityUtils.setVersion( first, UUIDGenerator.newTimeUUID() );
+
+
+ final Entity second = new Entity( "search" );
+ second.setSize(100);
+
+ second.setField( new StringField( "string", "I drank a beer" ) );
+
+
+ EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() );
+
+
+ EntityIndexBatch batch = entityIndex.createBatch();
+
+
+ //get ordering, so 2 is before 1 when both match
+ IndexEdge indexScope1 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 10 );
+ batch.index( indexScope1, first );
+
+
+ IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 11 );
+ batch.index( indexScope2, second);
+
+
+ batch.execute().toBlocking().last();
+ entityIndex.refreshAsync().toBlocking().first();
+ long size = entityIndex.getEntitySize();
+ assertTrue( size >= 200 );
+
+ }
+
+ @Test
+ public void testSizeByEdge(){
+ final String type = UUID.randomUUID().toString();
+
+ Id ownerId = new SimpleId( "owner" );
+
+
+ final Entity first = new Entity( type );
+
+ first.setField( new StringField( "string", "I ate a sammich" ) );
+ first.setSize(100);
+
+ EntityUtils.setVersion( first, UUIDGenerator.newTimeUUID() );
+
+
+ final Entity second = new Entity( type );
+ second.setSize(100);
+
+ second.setField( new StringField( "string", "I drank a beer" ) );
+
+
+ EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() );
+
+
+ EntityIndexBatch batch = entityIndex.createBatch();
+
+
+ //get ordering, so 2 is before 1 when both match
+ IndexEdge indexScope1 = new IndexEdgeImpl( ownerId,type , SearchEdge.NodeType.SOURCE, 10 );
+ batch.index( indexScope1, first );
+
+
+ IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, type+"er", SearchEdge.NodeType.SOURCE, 11 );
+ batch.index( indexScope2, second);
+
+
+ batch.execute().toBlocking().last();
+ entityIndex.refreshAsync().toBlocking().first();
+ long size = entityIndex.getEntitySize(type);
+ assertTrue( size == 100 );
+
+ }
+
}