You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/27 00:38:31 UTC

[15/24] incubator-usergrid git commit: change config

change config

changing how core uses index promises

change future implementation

adding promises back


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8eb09e04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8eb09e04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8eb09e04

Branch: refs/heads/two-dot-o
Commit: 8eb09e04cd0a4d47719b0d6dd403b066bfafeecb
Parents: 2145f21
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 10:30:52 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 11:45:30 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  7 ++--
 .../corepersistence/CpRelationManager.java      | 13 +++---
 .../results/FilteringLoader.java                | 42 +++++++++----------
 .../batch/job/AbstractSchedulerRuntimeIT.java   |  2 +-
 .../persistence/core/future/BetterFuture.java   | 13 +++---
 .../persistence/index/EntityIndexBatch.java     |  2 +
 .../usergrid/persistence/index/IndexFig.java    | 16 ++++++-
 .../index/impl/CorePerformanceIT.java           | 44 ++++++++++----------
 .../impl/EntityConnectionIndexImplTest.java     |  6 +--
 9 files changed, 82 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 721ac80..7905c43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -975,11 +976,11 @@ public class CpEntityManager implements EntityManager {
         } );
 
 
-        ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
-
+        BetterFuture future = ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
         // update in all containing collections and connection indexes
         CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef );
         rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
+        future.get();
     }
 
 
@@ -2829,7 +2830,7 @@ public class CpEntityManager implements EntityManager {
         //
         //        batch.index(appAllTypesScope, memberEntity);
 
-        batch.execute();
+        batch.execute().get();
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 7be6dea..07fc45e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.core.future.BetterFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -441,7 +442,7 @@ public class CpRelationManager implements RelationManager {
                 } ).count().toBlocking().lastOrDefault( 0 );
 
 
-        entityIndexBatch.execute();
+        entityIndexBatch.execute().get();
 
         logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
     }
@@ -834,7 +835,7 @@ public class CpRelationManager implements RelationManager {
 
         batch.deindex( itemScope, cpHeadEntity );
 
-        batch.execute();
+        BetterFuture future = batch.execute();
 
         // remove edge from collection to item
         GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -870,9 +871,9 @@ public class CpRelationManager implements RelationManager {
                 }
             }
         }
+        future.get();
     }
 
-
     @Override
     public void copyRelationships(String srcRelationName, EntityRef dstEntityRef,
             String dstRelationName) throws Exception {
@@ -1060,13 +1061,15 @@ public class CpRelationManager implements RelationManager {
 //        batch.index( allTypesIndexScope, targetEntity );
 
 
-        batch.execute();
+        BetterFuture future = batch.execute();
 
         Keyspace ko = cass.getApplicationKeyspace( applicationId );
         Mutator<ByteBuffer> m = createMutator( ko, be );
         batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() );
         batchExecute( m, CassandraService.RETRY_COUNT );
 
+        future.get();
+
         return connection;
     }
 
@@ -1291,7 +1294,7 @@ public class CpRelationManager implements RelationManager {
 //
 //        batch.deindex( allTypesIndexScope, targetEntity );
 
-        batch.execute();
+        batch.execute().get();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index dca59e0..7848be5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -71,10 +71,10 @@ public class FilteringLoader implements ResultsLoader {
      * @param applicationScope The application scope to perform the load
      * @param indexScope The index scope used in the search
      */
-    protected FilteringLoader( 
-            final ManagerCache managerCache, 
-            final ResultsVerifier resultsVerifier,  
-            final ApplicationScope applicationScope, 
+    protected FilteringLoader(
+            final ManagerCache managerCache,
+            final ResultsVerifier resultsVerifier,
+            final ApplicationScope applicationScope,
             final IndexScope indexScope ) {
 
         this.managerCache = managerCache;
@@ -103,25 +103,25 @@ public class FilteringLoader implements ResultsLoader {
         // Maps the entity ids to our candidates
         final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
 
-        // Groups all candidate results by types.  When search connections there will be multiple 
+        // Groups all candidate results by types.  When search connections there will be multiple
         // types, so we want to batch fetch them more efficiently
-        
-        final HashMultimap<String, CandidateResult> groupedByScopes = 
+
+        final HashMultimap<String, CandidateResult> groupedByScopes =
                 HashMultimap.create( crs.size(), crs.size() );
 
         final Iterator<CandidateResult> iter = crs.iterator();
 
 
-        // TODO, in this case we're "optimizing" due to the limitations of collection scope.  
+        // TODO, in this case we're "optimizing" due to the limitations of collection scope.
         // Perhaps  we should change the API to just be an application, then an "owner" scope?
 
-        // Go through the candidates and group them by scope for more efficient retrieval.  
+        // Go through the candidates and group them by scope for more efficient retrieval.
         // Also remove duplicates before we even make a network call
         for ( int i = 0; iter.hasNext(); i++ ) {
 
             final CandidateResult currentCandidate = iter.next();
 
-            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType( 
+            final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
                     currentCandidate.getId().getType() );
 
             final Id entityId = currentCandidate.getId();
@@ -147,11 +147,11 @@ public class FilteringLoader implements ResultsLoader {
             if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
 
                 //de-index it
-                logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", 
-                    new Object[] { 
-                        entityId.getUuid(), 
-                        entityId.getType(), 
-                        previousMaxVersion, 
+                logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+                    new Object[] {
+                        entityId.getUuid(),
+                        entityId.getType(),
+                        previousMaxVersion,
                         currentVersion } );
 
                 //deindex this document, and remove the previous maxVersion
@@ -170,7 +170,7 @@ public class FilteringLoader implements ResultsLoader {
         }
 
 
-        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify 
+        //now everything is ordered, and older versions are removed.  Batch fetch versions to verify
         // existence and correct versions
 
         final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
@@ -193,10 +193,10 @@ public class FilteringLoader implements ResultsLoader {
 
             //now using the scope, load the collection
 
-            // Get the collection scope and batch load all the versions.  We put all entities in 
-            // app/app for easy retrieval/ unless persistence changes, we never want to read from 
+            // Get the collection scope and batch load all the versions.  We put all entities in
+            // app/app for easy retrieval/ unless persistence changes, we never want to read from
             // any scope other than the app, app, scope name scope
-            final CollectionScope collScope = new CollectionScopeImpl( 
+            final CollectionScope collScope = new CollectionScopeImpl(
                 applicationScope.getApplication(), applicationScope.getApplication(), scopeName);
 
             final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope);
@@ -225,7 +225,7 @@ public class FilteringLoader implements ResultsLoader {
         }
 
 
-         // NOTE DO NOT execute the batch here.  
+         // NOTE DO NOT execute the batch here.
         // It changes the results and we need consistent paging until we aggregate all results
         return resultsVerifier.getResults( sortedResults.values() );
     }
@@ -233,7 +233,7 @@ public class FilteringLoader implements ResultsLoader {
 
     @Override
     public void postProcess() {
-        this.indexBatch.execute();
+        this.indexBatch.execute().get();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
index 8825497..81836ae 100644
--- a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
@@ -96,7 +96,7 @@ public class AbstractSchedulerRuntimeIT {
         JobSchedulerService jobScheduler = springResource.getBean( JobSchedulerService.class );
         jobScheduler.setJobListener( listener );
         if ( jobScheduler.state() != State.RUNNING ) {
-            jobScheduler.startAndWait();
+//            jobScheduler.startAndWait();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
index 6146fe8..201fa9a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -22,21 +22,22 @@ import java.util.concurrent.FutureTask;
 /**
  * Future without the exception nastiness
  */
-public  class BetterFuture<T>{
-    FutureTask<T> future;
+public  class BetterFuture<T> extends FutureTask<T> {
     public BetterFuture(Callable<T> callable){
-        future = new FutureTask<>(callable);
+        super(callable);
     }
 
     public void done(){
-        future.run();
+        run();
     }
 
     public T get(){
         try {
-            return future.get();
+            return super.get();
         }catch (Exception e){
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index a02d0da..bf606bc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -64,11 +64,13 @@ public interface EntityIndexBatch {
 
     /**
      * Execute the batch
+     * @return future to guarantee execution
      */
     public BetterFuture execute();
 
     /**
      * Execute the batch and force the refresh
+     * @return future to guarantee execution
      */
     public BetterFuture executeAndRefresh();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 9bdac36..7434b99 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -121,15 +121,27 @@ public interface IndexFig extends GuicyFig {
     @Default("2")
     int getIndexCacheMaxWorkers();
 
+    /**
+     * how long to wait before the buffer flushes to send
+     * @return
+     */
     @Default("250")
     @Key( INDEX_BUFFER_TIMEOUT )
     int getIndexBufferTimeout();
 
-    @Default("100")
+    /**
+     * size of the buffer to build up before you send results
+     * @return
+     */
+    @Default("300")
     @Key( INDEX_BUFFER_SIZE )
     int getIndexBufferSize();
 
-    @Default("300")
+    /**
+     * Request batch size for ES
+     * @return
+     */
+    @Default("100")
     @Key( INDEX_BATCH_SIZE)
     int getIndexBatchSize();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
index 6d09ccb..c1bfe38 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
@@ -72,12 +72,12 @@ public class CorePerformanceIT extends BaseIT {
     public static ElasticSearchResource es = new ElasticSearchResource();
 
     // max entities we will write and read
-    static int maxEntities = 10; // TODO: make this configurable when you add Chop 
+    static int maxEntities = 10; // TODO: make this configurable when you add Chop
 
     // each app will get all data
     static int appCount = 10;
 
-    // number of threads = orgCount x appCount 
+    // number of threads = orgCount x appCount
 
     // total number of records = orgCount x appCount x numRecords
 
@@ -191,14 +191,14 @@ public class CorePerformanceIT extends BaseIT {
                 count += candidateResults.size();
 
                 //cause retrieval from cassandra
-                EntityResults entityResults = new EntityResults( 
+                EntityResults entityResults = new EntityResults(
                     candidateResults, ecm, UUIDGenerator.newTimeUUID() );
 
                 while(entityResults.hasNext()){
                     entityResults.next();
                 }
 
-                log.info("Read {} reviews in {} / {} ", new Object[] { 
+                log.info("Read {} reviews in {} / {} ", new Object[] {
                     count, indexScope.getOwner(), indexScope.getName() } );
             }
         }
@@ -216,7 +216,7 @@ public class CorePerformanceIT extends BaseIT {
 
         public void run() {
 
-            CollectionScope collectionScope = new CollectionScopeImpl( 
+            CollectionScope collectionScope = new CollectionScopeImpl(
                     applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName() );
             EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope );
             EntityIndex eci = ecif.createEntityIndex(applicationScope );
@@ -232,7 +232,7 @@ public class CorePerformanceIT extends BaseIT {
 
             // create the first entry
             Entity current = new Entity(
-                new SimpleId(UUIDGenerator.newTimeUUID(), "review")); 
+                new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
 
 //            Id orgId = orgAppScope.scope.getApplication();
 //            Id appId = orgAppScope.scope.getOwner();
@@ -243,54 +243,54 @@ public class CorePerformanceIT extends BaseIT {
 
             try {
                 while ( (s = br.readLine()) != null && count < maxEntities ) {
-                    
+
                     try {
-                        
+
                         if ( s.trim().equals("")) { // then we are at end of a record
-                            
+
                             // write and index current entity
                             ecm.write( current ).toBlocking().last();
 
                             entityIndexBatch.index(indexScope, current  );
-                            
+
                             if ( maxEntities < 20 ) {
                                 log.info("Index written for {}", current.getId());
                                 log.info("---");
                             }
-                            
+
                             // create the next entity
                             current = new Entity(
                                     new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
-                            
+
                             count++;
                             if(count % 1000 == 0){
-                                entityIndexBatch.execute();
+                                entityIndexBatch.execute().get();
                             }
 
                             if (count % 100000 == 0) {
-                                log.info("Indexed {} reviews in {} / {} ", 
-                                    new Object[] { 
-                                        count, 
+                                log.info("Indexed {} reviews in {} / {} ",
+                                    new Object[] {
+                                        count,
                                             applicationScope,
                                         indexScope.getOwner() } );
                             }
                             continue;
                         }
-                        
+
                         // process a field
                         String name = s.substring( 0, s.indexOf(":")).replace("/", "_").toLowerCase() ;
                         String value = s.substring( s.indexOf(":") + 1 ).trim();
-                        
+
                         if ( maxEntities < 20 ) {
                             log.info("Indexing {} = {}", name, value);
                         }
-                        
+
                         if ( NumberUtils.isNumber(value) && value.contains(".")) {
                             current.setField( new DoubleField( name, Double.parseDouble(value)));
-                            
+
                         } else if ( NumberUtils.isNumber(value) ) {
                             current.setField( new LongField( name, Long.parseLong(value)));
-                            
+
                         } else {
                             current.setField( new StringField( name, value.toString() ));
                         }
@@ -306,7 +306,7 @@ public class CorePerformanceIT extends BaseIT {
 
             eci.refresh();
         }
-    }   
+    }
 
 
     public void runSelectedQueries(final ApplicationScope scope,  List<IndexScope> indexScopes ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index b07bd21..c65f106 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -136,7 +136,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.index( searchScope, oj );
         batch.index( otherIndexScope, oj );
 
-        batch.executeAndRefresh();
+        batch.executeAndRefresh().get();
         personLikesIndex.refresh();
 
 
@@ -267,7 +267,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.index( searchScope, oj );
         batch.index( otherIndexScope, oj );
 
-        batch.executeAndRefresh();
+        batch.executeAndRefresh().get();
         personLikesIndex.refresh();
 
         EsTestUtils.waitForTasks( personLikesIndex );
@@ -287,7 +287,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
         batch.deindex( searchScope, egg );
         batch.deindex( searchScope, muffin );
         batch.deindex( searchScope, oj );
-        batch.executeAndRefresh();
+        batch.executeAndRefresh().get();
 
         likes = personLikesIndex.search( searchScope,
                 SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),