You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/28 00:15:59 UTC

[04/10] usergrid git commit: add aggregation service

add aggregation service


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/198f4891
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/198f4891
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/198f4891

Branch: refs/heads/two-dot-o-dev
Commit: 198f489155b3f9d4346d62c0fe7ba859be917b70
Parents: c47b02d
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Aug 26 09:36:32 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Aug 26 09:36:32 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/index/EntityIndex.java |  61 ++++++---
 .../index/impl/EsEntityIndexImpl.java           |  51 +++++++-
 .../persistence/index/impl/EntityIndexTest.java | 129 ++++++++++++++++---
 3 files changed, 193 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index 7fa2f07..6d563ff 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -39,34 +39,47 @@ public interface EntityIndex extends CPManager {
 
     /**
      * Create an index and add to alias, will create alias and remove any old index from write alias if alias already exists
-     * @param indexSuffix index name
+     *
+     * @param indexSuffix      index name
      * @param shards
      * @param replicas
      * @param writeConsistency
      */
-     void addIndex(
-         final String indexSuffix,
-         final int shards,
-         final int replicas,
-         final String writeConsistency
-     );
+    void addIndex(
+        final String indexSuffix,
+        final int shards,
+        final int replicas,
+        final String writeConsistency
+    );
 
     /**
      * Refresh the index.
      */
-     Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync();
+    Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync();
 
 
     /**
      * Check health of cluster.
      */
-     Health getClusterHealth();
+    Health getClusterHealth();
 
     /**
      * Check health of this specific index.
      */
-     Health getIndexHealth();
+    Health getIndexHealth();
 
+    /**
+     * get total entity size
+     * @return
+     */
+    long getEntitySize();
+
+    /**
+     * get total entity size by an edge ->   "term":{"edgeName":"zzzcollzzz|roles"}
+     * @param edge
+     * @return
+     */
+    long getEntitySize(final String edge);
 
     /**
      * Initialize the index if necessary.  This is an idempotent operation and should not create an index
@@ -82,48 +95,54 @@ public interface EntityIndex extends CPManager {
 
     /**
      * Search on every document in the specified search edge.  Also search by the types if specified
-     * @param searchEdge The edge to search on
+     *
+     * @param searchEdge  The edge to search on
      * @param searchTypes The search types to search
-     * @param query The query to execute
-     * @param limit The limit of values to return
-     * @param offset The offset to query on
+     * @param query       The query to execute
+     * @param limit       The limit of values to return
+     * @param offset      The offset to query on
      * @return
      */
-    CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
-                             final int limit, final int offset );
+    CandidateResults search(final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
+                            final int limit, final int offset);
 
 
     /**
      * Same as search, just iterates all documents that match the index edge exactly.
-     * @param edge The edge to search on
+     *
+     * @param edge     The edge to search on
      * @param entityId The entity that the searchEdge is connected to.
      * @return
      */
-    CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId );
+    CandidateResults getAllEdgeDocuments(final IndexEdge edge, final Id entityId);
 
     /**
      * Returns all entity documents that match the entityId and come before the marked version
-     * @param entityId The entityId to match when searching
+     *
+     * @param entityId      The entityId to match when searching
      * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
      * @return
      */
-    CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion );
+    CandidateResults getAllEntityVersionsBeforeMarkedVersion(final Id entityId, final UUID markedVersion);
 
     /**
      * delete all application records
+     *
      * @return
      */
     Observable deleteApplication();
 
     /**
      * Get the indexes for an alias
+     *
      * @param aliasType name of alias
      * @return list of index names
      */
-    String[] getIndexes( final AliasType aliasType );
+    String[] getIndexes(final AliasType aliasType);
 
     /**
      * get all unique indexes
+     *
      * @return
      */
     String[] getIndexes();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 87e2dbd..800dac3 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -62,6 +62,10 @@ import org.elasticsearch.index.query.*;
 import org.elasticsearch.indices.IndexAlreadyExistsException;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -424,7 +428,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
          and query Es directly for matches
 
          */
-        IndexValidationUtils.validateSearchEdge( edge );
+        IndexValidationUtils.validateSearchEdge(edge);
         Preconditions.checkNotNull( entityId, "entityId cannot be null" );
 
         SearchResponse searchResponse;
@@ -505,12 +509,12 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
         final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
 
-        FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
-            IndexingUtils.entityId( entityId ) );
+        FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
+            IndexingUtils.entityId(entityId));
 
-        FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion );
+        FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte(markedVersion);
 
-        FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter  );
+        FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter, entityVersionFilter);
 
         srb.setPostFilter(andFilter);
 
@@ -570,7 +574,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
      * Completely delete an index.
      */
     public Observable deleteApplication() {
-        String idString = applicationId( applicationScope.getApplication() );
+        String idString = applicationId(applicationScope.getApplication());
         final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
         final String[] indexes = getIndexes();
         //Added For Graphite Metrics
@@ -734,6 +738,41 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
     }
 
     @Override
+    public long getEntitySize(){
+
+
+        SearchRequestBuilder builder = searchRequestBuilderStrategyV2.getBuilder();
+        return  getEntitySizeAggregation(builder);
+    }
+
+
+    @Override
+    public long getEntitySize(final String edge){
+        //"term":{"edgeName":"zzzcollzzz|roles"}
+        SearchRequestBuilder builder = searchRequestBuilderStrategyV2.getBuilder();
+        builder.setQuery(new TermQueryBuilder("edgeName",edge));
+        return  getEntitySizeAggregation(builder);
+    }
+
+    private long getEntitySizeAggregation(  SearchRequestBuilder builder) {
+        final String key = "entitySize";
+        SumBuilder sumBuilder = new SumBuilder(key);
+        sumBuilder.field("entitySize");
+        builder.addAggregation(sumBuilder);
+        Observable<Number> o = Observable.from(builder.execute())
+            .map(response -> {
+                Sum aggregation = (Sum) response.getAggregations().get(key);
+                if(aggregation == null){
+                    return -1;
+                }else{
+                    return aggregation.getValue();
+                }
+            });
+        Number val =  o.toBlocking().first();
+        return val.longValue();
+    }
+
+    @Override
     public int getImplementationVersion() {
         return IndexDataVersions.SINGLE_INDEX.getVersion();
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/198f4891/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 9154382..564e5e7 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -270,7 +270,7 @@ public class EntityIndexTest extends BaseIT {
         insertJsonBlob(  entityType, searchEdge, "/sample-large.json", 1, 0 );
 
 
-        entityIndex.addIndex(UUID.randomUUID()+ "v2", 1, 0, "one" );
+        entityIndex.addIndex(UUID.randomUUID() + "v2", 1, 0, "one");
         entityIndex.refreshAsync().toBlocking().first();
 
         insertJsonBlob(  entityType, searchEdge, "/sample-large.json", 1, 1 );
@@ -278,12 +278,12 @@ public class EntityIndexTest extends BaseIT {
         CandidateResults crs = testQuery( searchEdge, searchTypes, "name = 'Bowers Oneil'", 1 );
 
         EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
-        entityIndexBatch.deindex( searchEdge, crs.get( 0 ) );
+        entityIndexBatch.deindex(searchEdge, crs.get(0));
         entityIndexBatch.execute().toBlocking().last();
         entityIndex.refreshAsync().toBlocking().first();
 
         //Hilda Youn
-        testQuery( searchEdge, searchTypes, "name = 'Bowers Oneil'", 0 );
+        testQuery(searchEdge, searchTypes, "name = 'Bowers Oneil'", 0);
     }
 
 
@@ -291,13 +291,14 @@ public class EntityIndexTest extends BaseIT {
                                  String filePath, final int max, final int startIndex ) throws IOException {
         InputStream is = this.getClass().getResourceAsStream( filePath );
         ObjectMapper mapper = new ObjectMapper();
-        List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
+        List<Object> sampleJson = mapper.readValue(is, new TypeReference<List<Object>>() {
+        });
         EntityIndexBatch batch = entityIndex.createBatch();
-        insertJsonBlob( sampleJson, batch, entityType, indexEdge, max, startIndex );
+        insertJsonBlob(sampleJson, batch, entityType, indexEdge, max, startIndex);
         batch.execute().toBlocking().last();
         IndexRefreshCommandImpl.IndexRefreshCommandInfo info =  entityIndex.refreshAsync().toBlocking().first();
         long time = info.getExecutionTime();
-        log.info( "refresh took ms:" + time );
+        log.info("refresh took ms:" + time);
     }
 
 
@@ -366,7 +367,7 @@ public class EntityIndexTest extends BaseIT {
 
         candidateResults = entityIndex
             .search(searchEdge, SearchTypes.fromTypes( entity.getId().getType() ), "name contains 'Ferrari*'", 10, 0 );
-        assertEquals( 0, candidateResults.size() );
+        assertEquals(0, candidateResults.size());
     }
 
 
@@ -420,8 +421,8 @@ public class EntityIndexTest extends BaseIT {
 
         timer.stop();
 
-        assertEquals( num, candidateResults.size() );
-        log.debug( "Query time {}ms", timer.getTime() );
+        assertEquals(num, candidateResults.size());
+        log.debug("Query time {}ms", timer.getTime());
         return candidateResults;
     }
 
@@ -609,13 +610,13 @@ public class EntityIndexTest extends BaseIT {
         assertEquals( bill.getId(), r.get( 0 ).getId() );
 
         r = entityIndex.search( indexScope, searchTypes, "where username = 'fred'", 10, 0);
-        assertEquals( fred.getId(), r.get( 0 ).getId() );
+        assertEquals(fred.getId(), r.get(0).getId());
 
         r = entityIndex.search( indexScope, searchTypes, "where age = 41", 10, 0);
-        assertEquals( fred.getId(), r.get( 0 ).getId() );
+        assertEquals(fred.getId(), r.get(0).getId());
 
         r = entityIndex.search( indexScope, searchTypes, "where age = 'thirtysomething'", 10, 0);
-        assertEquals( bill.getId(), r.get( 0 ).getId() );
+        assertEquals(bill.getId(), r.get(0).getId());
     }
 
 
@@ -749,8 +750,8 @@ public class EntityIndexTest extends BaseIT {
         final String query = "where searchUUID = " + searchUUID;
 
         final CandidateResults r =
-            entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0);
-        assertEquals( user.getId(), r.get( 0 ).getId() );
+            entityIndex.search( indexSCope, SearchTypes.fromTypes(entityId.getType()), query, 10, 0);
+        assertEquals(user.getId(), r.get(0).getId());
     }
 
 
@@ -781,7 +782,7 @@ public class EntityIndexTest extends BaseIT {
 
         EntityIndexBatch batch = entityIndex.createBatch();
 
-        batch.index( indexSCope, user );
+        batch.index(indexSCope, user);
         batch.execute().toBlocking().last();
         entityIndex.refreshAsync().toBlocking().first();
 
@@ -790,7 +791,7 @@ public class EntityIndexTest extends BaseIT {
         final CandidateResults r =
             entityIndex.search( indexSCope, SearchTypes.fromTypes( entityId.getType() ), query, 10, 0);
 
-        assertEquals(user.getId(), r.get(0).getId() );
+        assertEquals(user.getId(), r.get(0).getId());
 
         //shouldn't match
         final String queryNoWildCard = "where string = 'I am'";
@@ -830,7 +831,7 @@ public class EntityIndexTest extends BaseIT {
 
         final Entity second = new Entity( "search" );
 
-        second.setField( new StringField( "string", "bravo long string" ) );
+        second.setField(new StringField("string", "bravo long string"));
 
 
         EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() );
@@ -897,11 +898,11 @@ public class EntityIndexTest extends BaseIT {
 
         //get ordering, so 2 is before 1 when both match
         IndexEdge indexScope1 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 10 );
-        batch.index( indexScope1, first );
+        batch.index(indexScope1, first);
 
 
         IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 11 );
-        batch.index( indexScope2, second);
+        batch.index(indexScope2, second);
 
 
         batch.execute().toBlocking().last();
@@ -914,8 +915,8 @@ public class EntityIndexTest extends BaseIT {
             entityIndex.search(indexScope1, SearchTypes.fromTypes( first.getId().getType() ), singleMatchQuery, 10, 0 );
 
 
-        assertEquals( 1, singleResults.size() );
-        assertEquals( first.getId(), singleResults.get( 0 ).getId() );
+        assertEquals(1, singleResults.size());
+        assertEquals(first.getId(), singleResults.get(0).getId());
 
 
         //search in reversed
@@ -1213,6 +1214,92 @@ public class EntityIndexTest extends BaseIT {
         assertEquals( 0, noMatchesContainsOrResults.size() );
     }
 
+    @Test
+    public void testSize(){
+        final String type = UUID.randomUUID().toString();
+
+        Id ownerId = new SimpleId( type );
+
+
+        final Entity first = new Entity( "search" );
+
+        first.setField( new StringField( "string", "I ate a sammich" ) );
+        first.setSize(100);
+
+        EntityUtils.setVersion( first, UUIDGenerator.newTimeUUID() );
+
+
+        final Entity second = new Entity( "search" );
+        second.setSize(100);
+
+        second.setField( new StringField( "string", "I drank a beer" ) );
+
+
+        EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() );
+
+
+        EntityIndexBatch batch = entityIndex.createBatch();
+
+
+        //get ordering, so 2 is before 1 when both match
+        IndexEdge indexScope1 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 10 );
+        batch.index( indexScope1, first );
+
+
+        IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, "searches", SearchEdge.NodeType.SOURCE, 11 );
+        batch.index( indexScope2, second);
+
+
+        batch.execute().toBlocking().last();
+        entityIndex.refreshAsync().toBlocking().first();
+        long size = entityIndex.getEntitySize();
+        assertTrue( size >= 200 );
+
+    }
+
+    @Test
+    public void testSizeByEdge(){
+        final String type = UUID.randomUUID().toString();
+
+        Id ownerId = new SimpleId( "owner" );
+
+
+        final Entity first = new Entity( type );
+
+        first.setField( new StringField( "string", "I ate a sammich" ) );
+        first.setSize(100);
+
+        EntityUtils.setVersion( first, UUIDGenerator.newTimeUUID() );
+
+
+        final Entity second = new Entity( type );
+        second.setSize(100);
+
+        second.setField( new StringField( "string", "I drank a beer" ) );
+
+
+        EntityUtils.setVersion( second, UUIDGenerator.newTimeUUID() );
+
+
+        EntityIndexBatch batch = entityIndex.createBatch();
+
+
+        //get ordering, so 2 is before 1 when both match
+        IndexEdge indexScope1 = new IndexEdgeImpl( ownerId,type , SearchEdge.NodeType.SOURCE, 10 );
+        batch.index( indexScope1, first );
+
+
+        IndexEdge indexScope2 = new IndexEdgeImpl( ownerId, type+"er", SearchEdge.NodeType.SOURCE, 11 );
+        batch.index( indexScope2, second);
+
+
+        batch.execute().toBlocking().last();
+        entityIndex.refreshAsync().toBlocking().first();
+        long size = entityIndex.getEntitySize(type);
+        assertTrue( size == 100 );
+
+    }
+
 
 }