You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 21:01:02 UTC
[23/37] incubator-usergrid git commit: change config
change config
changing how core uses index promises
change future implementation
adding promises back
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8eb09e04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8eb09e04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8eb09e04
Branch: refs/heads/USERGRID-422
Commit: 8eb09e04cd0a4d47719b0d6dd403b066bfafeecb
Parents: 2145f21
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 10:30:52 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 11:45:30 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 7 ++--
.../corepersistence/CpRelationManager.java | 13 +++---
.../results/FilteringLoader.java | 42 +++++++++----------
.../batch/job/AbstractSchedulerRuntimeIT.java | 2 +-
.../persistence/core/future/BetterFuture.java | 13 +++---
.../persistence/index/EntityIndexBatch.java | 2 +
.../usergrid/persistence/index/IndexFig.java | 16 ++++++-
.../index/impl/CorePerformanceIT.java | 44 ++++++++++----------
.../impl/EntityConnectionIndexImplTest.java | 6 +--
9 files changed, 82 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 721ac80..7905c43 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -975,11 +976,11 @@ public class CpEntityManager implements EntityManager {
} );
- ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
-
+ BetterFuture future = ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
// update in all containing collections and connection indexes
CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef );
rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
+ future.get();
}
@@ -2829,7 +2830,7 @@ public class CpEntityManager implements EntityManager {
//
// batch.index(appAllTypesScope, memberEntity);
- batch.execute();
+ batch.execute().get();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 7be6dea..07fc45e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -441,7 +442,7 @@ public class CpRelationManager implements RelationManager {
} ).count().toBlocking().lastOrDefault( 0 );
- entityIndexBatch.execute();
+ entityIndexBatch.execute().get();
logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
}
@@ -834,7 +835,7 @@ public class CpRelationManager implements RelationManager {
batch.deindex( itemScope, cpHeadEntity );
- batch.execute();
+ BetterFuture future = batch.execute();
// remove edge from collection to item
GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -870,9 +871,9 @@ public class CpRelationManager implements RelationManager {
}
}
}
+ future.get();
}
-
@Override
public void copyRelationships(String srcRelationName, EntityRef dstEntityRef,
String dstRelationName) throws Exception {
@@ -1060,13 +1061,15 @@ public class CpRelationManager implements RelationManager {
// batch.index( allTypesIndexScope, targetEntity );
- batch.execute();
+ BetterFuture future = batch.execute();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
Mutator<ByteBuffer> m = createMutator( ko, be );
batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() );
batchExecute( m, CassandraService.RETRY_COUNT );
+ future.get();
+
return connection;
}
@@ -1291,7 +1294,7 @@ public class CpRelationManager implements RelationManager {
//
// batch.deindex( allTypesIndexScope, targetEntity );
- batch.execute();
+ batch.execute().get();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index dca59e0..7848be5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -71,10 +71,10 @@ public class FilteringLoader implements ResultsLoader {
* @param applicationScope The application scope to perform the load
* @param indexScope The index scope used in the search
*/
- protected FilteringLoader(
- final ManagerCache managerCache,
- final ResultsVerifier resultsVerifier,
- final ApplicationScope applicationScope,
+ protected FilteringLoader(
+ final ManagerCache managerCache,
+ final ResultsVerifier resultsVerifier,
+ final ApplicationScope applicationScope,
final IndexScope indexScope ) {
this.managerCache = managerCache;
@@ -103,25 +103,25 @@ public class FilteringLoader implements ResultsLoader {
// Maps the entity ids to our candidates
final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
- // Groups all candidate results by types. When search connections there will be multiple
+ // Groups all candidate results by types. When search connections there will be multiple
// types, so we want to batch fetch them more efficiently
-
- final HashMultimap<String, CandidateResult> groupedByScopes =
+
+ final HashMultimap<String, CandidateResult> groupedByScopes =
HashMultimap.create( crs.size(), crs.size() );
final Iterator<CandidateResult> iter = crs.iterator();
- // TODO, in this case we're "optimizing" due to the limitations of collection scope.
+ // TODO, in this case we're "optimizing" due to the limitations of collection scope.
// Perhaps we should change the API to just be an application, then an "owner" scope?
- // Go through the candidates and group them by scope for more efficient retrieval.
+ // Go through the candidates and group them by scope for more efficient retrieval.
// Also remove duplicates before we even make a network call
for ( int i = 0; iter.hasNext(); i++ ) {
final CandidateResult currentCandidate = iter.next();
- final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
+ final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
currentCandidate.getId().getType() );
final Id entityId = currentCandidate.getId();
@@ -147,11 +147,11 @@ public class FilteringLoader implements ResultsLoader {
if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
//de-index it
- logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] {
- entityId.getUuid(),
- entityId.getType(),
- previousMaxVersion,
+ logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
+ new Object[] {
+ entityId.getUuid(),
+ entityId.getType(),
+ previousMaxVersion,
currentVersion } );
//deindex this document, and remove the previous maxVersion
@@ -170,7 +170,7 @@ public class FilteringLoader implements ResultsLoader {
}
- //now everything is ordered, and older versions are removed. Batch fetch versions to verify
+ //now everything is ordered, and older versions are removed. Batch fetch versions to verify
// existence and correct versions
final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
@@ -193,10 +193,10 @@ public class FilteringLoader implements ResultsLoader {
//now using the scope, load the collection
- // Get the collection scope and batch load all the versions. We put all entities in
- // app/app for easy retrieval/ unless persistence changes, we never want to read from
+ // Get the collection scope and batch load all the versions. We put all entities in
+ // app/app for easy retrieval/ unless persistence changes, we never want to read from
// any scope other than the app, app, scope name scope
- final CollectionScope collScope = new CollectionScopeImpl(
+ final CollectionScope collScope = new CollectionScopeImpl(
applicationScope.getApplication(), applicationScope.getApplication(), scopeName);
final EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collScope);
@@ -225,7 +225,7 @@ public class FilteringLoader implements ResultsLoader {
}
- // NOTE DO NOT execute the batch here.
+ // NOTE DO NOT execute the batch here.
// It changes the results and we need consistent paging until we aggregate all results
return resultsVerifier.getResults( sortedResults.values() );
}
@@ -233,7 +233,7 @@ public class FilteringLoader implements ResultsLoader {
@Override
public void postProcess() {
- this.indexBatch.execute();
+ this.indexBatch.execute().get();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
index 8825497..81836ae 100644
--- a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
@@ -96,7 +96,7 @@ public class AbstractSchedulerRuntimeIT {
JobSchedulerService jobScheduler = springResource.getBean( JobSchedulerService.class );
jobScheduler.setJobListener( listener );
if ( jobScheduler.state() != State.RUNNING ) {
- jobScheduler.startAndWait();
+// jobScheduler.startAndWait();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
index 6146fe8..201fa9a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/BetterFuture.java
@@ -22,21 +22,22 @@ import java.util.concurrent.FutureTask;
/**
* Future without the exception nastiness
*/
-public class BetterFuture<T>{
- FutureTask<T> future;
+public class BetterFuture<T> extends FutureTask<T> {
public BetterFuture(Callable<T> callable){
- future = new FutureTask<>(callable);
+ super(callable);
}
public void done(){
- future.run();
+ run();
}
public T get(){
try {
- return future.get();
+ return super.get();
}catch (Exception e){
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index a02d0da..bf606bc 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -64,11 +64,13 @@ public interface EntityIndexBatch {
/**
* Execute the batch
+ * @return future to guarantee execution
*/
public BetterFuture execute();
/**
* Execute the batch and force the refresh
+ * @return future to guarantee execution
*/
public BetterFuture executeAndRefresh();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 9bdac36..7434b99 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -121,15 +121,27 @@ public interface IndexFig extends GuicyFig {
@Default("2")
int getIndexCacheMaxWorkers();
+ /**
+ * how long to wait before the buffer flushes to send
+ * @return
+ */
@Default("250")
@Key( INDEX_BUFFER_TIMEOUT )
int getIndexBufferTimeout();
- @Default("100")
+ /**
+ * size of the buffer to build up before you send results
+ * @return
+ */
+ @Default("300")
@Key( INDEX_BUFFER_SIZE )
int getIndexBufferSize();
- @Default("300")
+ /**
+ * Request batch size for ES
+ * @return
+ */
+ @Default("100")
@Key( INDEX_BATCH_SIZE)
int getIndexBatchSize();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
index 6d09ccb..c1bfe38 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/CorePerformanceIT.java
@@ -72,12 +72,12 @@ public class CorePerformanceIT extends BaseIT {
public static ElasticSearchResource es = new ElasticSearchResource();
// max entities we will write and read
- static int maxEntities = 10; // TODO: make this configurable when you add Chop
+ static int maxEntities = 10; // TODO: make this configurable when you add Chop
// each app will get all data
static int appCount = 10;
- // number of threads = orgCount x appCount
+ // number of threads = orgCount x appCount
// total number of records = orgCount x appCount x numRecords
@@ -191,14 +191,14 @@ public class CorePerformanceIT extends BaseIT {
count += candidateResults.size();
//cause retrieval from cassandra
- EntityResults entityResults = new EntityResults(
+ EntityResults entityResults = new EntityResults(
candidateResults, ecm, UUIDGenerator.newTimeUUID() );
while(entityResults.hasNext()){
entityResults.next();
}
- log.info("Read {} reviews in {} / {} ", new Object[] {
+ log.info("Read {} reviews in {} / {} ", new Object[] {
count, indexScope.getOwner(), indexScope.getName() } );
}
}
@@ -216,7 +216,7 @@ public class CorePerformanceIT extends BaseIT {
public void run() {
- CollectionScope collectionScope = new CollectionScopeImpl(
+ CollectionScope collectionScope = new CollectionScopeImpl(
applicationScope.getApplication(), indexScope.getOwner(), indexScope.getName() );
EntityCollectionManager ecm = ecmf.createCollectionManager(collectionScope );
EntityIndex eci = ecif.createEntityIndex(applicationScope );
@@ -232,7 +232,7 @@ public class CorePerformanceIT extends BaseIT {
// create the first entry
Entity current = new Entity(
- new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
+ new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
// Id orgId = orgAppScope.scope.getApplication();
// Id appId = orgAppScope.scope.getOwner();
@@ -243,54 +243,54 @@ public class CorePerformanceIT extends BaseIT {
try {
while ( (s = br.readLine()) != null && count < maxEntities ) {
-
+
try {
-
+
if ( s.trim().equals("")) { // then we are at end of a record
-
+
// write and index current entity
ecm.write( current ).toBlocking().last();
entityIndexBatch.index(indexScope, current );
-
+
if ( maxEntities < 20 ) {
log.info("Index written for {}", current.getId());
log.info("---");
}
-
+
// create the next entity
current = new Entity(
new SimpleId(UUIDGenerator.newTimeUUID(), "review"));
-
+
count++;
if(count % 1000 == 0){
- entityIndexBatch.execute();
+ entityIndexBatch.execute().get();
}
if (count % 100000 == 0) {
- log.info("Indexed {} reviews in {} / {} ",
- new Object[] {
- count,
+ log.info("Indexed {} reviews in {} / {} ",
+ new Object[] {
+ count,
applicationScope,
indexScope.getOwner() } );
}
continue;
}
-
+
// process a field
String name = s.substring( 0, s.indexOf(":")).replace("/", "_").toLowerCase() ;
String value = s.substring( s.indexOf(":") + 1 ).trim();
-
+
if ( maxEntities < 20 ) {
log.info("Indexing {} = {}", name, value);
}
-
+
if ( NumberUtils.isNumber(value) && value.contains(".")) {
current.setField( new DoubleField( name, Double.parseDouble(value)));
-
+
} else if ( NumberUtils.isNumber(value) ) {
current.setField( new LongField( name, Long.parseLong(value)));
-
+
} else {
current.setField( new StringField( name, value.toString() ));
}
@@ -306,7 +306,7 @@ public class CorePerformanceIT extends BaseIT {
eci.refresh();
}
- }
+ }
public void runSelectedQueries(final ApplicationScope scope, List<IndexScope> indexScopes ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8eb09e04/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index b07bd21..c65f106 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -136,7 +136,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.index( searchScope, oj );
batch.index( otherIndexScope, oj );
- batch.executeAndRefresh();
+ batch.executeAndRefresh().get();
personLikesIndex.refresh();
@@ -267,7 +267,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.index( searchScope, oj );
batch.index( otherIndexScope, oj );
- batch.executeAndRefresh();
+ batch.executeAndRefresh().get();
personLikesIndex.refresh();
EsTestUtils.waitForTasks( personLikesIndex );
@@ -287,7 +287,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.deindex( searchScope, egg );
batch.deindex( searchScope, muffin );
batch.deindex( searchScope, oj );
- batch.executeAndRefresh();
+ batch.executeAndRefresh().get();
likes = personLikesIndex.search( searchScope,
SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),