You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/21 22:12:13 UTC
[02/50] usergrid git commit: removed search from refresh need to use
queue
removed search from refresh need to use queue
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8614a68
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8614a68
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8614a68
Branch: refs/heads/jacoco
Commit: f8614a68f112469713874f032e40ab9fcc9b24d8
Parents: 2d23aa6
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 9 15:28:33 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 9 15:28:33 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 43 ++++--
.../asyncevents/EventBuilderImpl.java | 10 +-
.../org/apache/usergrid/CoreApplication.java | 14 +-
.../corepersistence/StaleIndexCleanupTest.java | 8 +-
.../persistence/ApplicationServiceIT.java | 9 +-
.../index/impl/IndexRefreshCommandImpl.java | 134 +++----------------
.../persistence/queue/DefaultQueueManager.java | 2 +-
7 files changed, 76 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index d46c112..643a2b8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -41,6 +41,11 @@ import org.apache.usergrid.corepersistence.service.ConnectionService;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.IndexRefreshCommand;
+import org.apache.usergrid.persistence.index.utils.*;
+import org.apache.usergrid.utils.*;
+import org.apache.usergrid.utils.ClassUtils;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -102,11 +107,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.utils.ClassUtils;
-import org.apache.usergrid.utils.CompositeUtils;
-import org.apache.usergrid.utils.Inflector;
-import org.apache.usergrid.utils.StringUtils;
-import org.apache.usergrid.utils.UUIDUtils;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
@@ -2874,13 +2874,36 @@ public class CpEntityManager implements EntityManager {
* TODO, these 3 methods are super janky. During refactoring we should clean this model up
*/
public IndexRefreshCommand.IndexRefreshCommandInfo refreshIndex() {
+ try {
+ long start = System.currentTimeMillis();
+ // refresh special indexes without calling EntityManager refresh because stack overflow
+ Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
+ map.put("some prop", "test");
+ boolean hasFinished = false;
+ Entity refreshEntity = create("refresh", map);
+ try {
+ for (int i = 0; i < 10; i++) {
+ if (searchCollection(
+ new SimpleEntityRef(org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
+ InflectionUtils.pluralize("refresh"),
+ Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
+ ).size() > 0
+ ) {
+ hasFinished = true;
+ break;
+ }
+ Thread.sleep(250);
+ return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
+ }
+ }finally {
+ delete(refreshEntity);
+ }
+ return new IndexRefreshCommand.IndexRefreshCommandInfo(hasFinished,System.currentTimeMillis() - start);
+ } catch (Exception e) {
+ throw new RuntimeException("refresh failed",e);
+ }
- // refresh special indexes without calling EntityManager refresh because stack overflow
-
- return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
}
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4bf5695..bc72207 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -108,10 +108,12 @@ public class EventBuilderImpl implements EventBuilder {
log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
final Observable<IndexOperationMessage> edgeObservable =
- indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
- final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return gm.deleteEdge( edge ).map( deletedEdge -> batch );
- } );
+ indexService.deleteIndexEdge( applicationScope, edge )
+ .map( batch -> {
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+ gm.deleteEdge(edge).toBlocking().lastOrDefault(null);
+ return batch;
+ } );
return edgeObservable;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 9c96fb8..d207894 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -17,10 +17,7 @@
package org.apache.usergrid;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import com.google.inject.Injector;
@@ -28,8 +25,10 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.service.ApplicationService;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.utils.MapUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.InflectionUtils;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
@@ -235,14 +234,11 @@ public class CoreApplication implements Application, TestRule {
//Insert test entity and find it
setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid());
- if(!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
+ if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
setup.getEmf().refreshIndex(em.getApplicationId());
}
- try {
- Thread.sleep(2000);
- }catch (Exception e){
- }
+ em.refreshIndex();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 3e46e4f..df93e68 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -321,8 +321,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
final EntityManager em = app.getEntityManager();
- final int numEntities = 20;
- final int numUpdates = 40;
+ final int numEntities = 5;
+ final int numUpdates = 5;
// create lots of entities
final List<Entity> things = new ArrayList<Entity>(numEntities);
@@ -348,7 +348,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
for ( int j=0; j<numUpdates; j++) {
toUpdate = em.get( thing.getUuid() );
- toUpdate.setProperty( "property" + j, RandomStringUtils.randomAlphanumeric(10));
+ toUpdate.setProperty( "property" + j, UUID.randomUUID().toString());
em.update(toUpdate);
@@ -367,9 +367,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
if(numEntities * (numUpdates + 1) == crs.size()){
break;
}
- Thread.sleep(250);
crs = queryCollectionCp("things", "thing", "select *");
-
}
// Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1), crs.size());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index d870114..f8079e5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -76,7 +76,6 @@ public class ApplicationServiceIT extends AbstractCoreIT {
count = ids.count().toBlocking().last();
Assert.assertEquals(count, 5);
this.app.refreshIndex();
- Thread.sleep(5000);
Injector injector = SpringResource.getInstance().getBean(Injector.class);
GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class);
GraphManager graphManager = factory.createEdgeManager(appScope);
@@ -88,7 +87,13 @@ public class ApplicationServiceIT extends AbstractCoreIT {
Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
if(results.hasNext()){
- Assert.fail("should be empty");
+ int i = 0;
+
+ while(results.hasNext()){
+ results.next();
+ i++;
+ }
+ Assert.fail("should be empty but has "+i);
}else{
Results searchCollection = entityManager.searchCollection(entityManager.getApplication(), "tests", Query.all());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 087eefe..6b8b024 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -85,118 +85,26 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
final long start = System.currentTimeMillis();
-
- //id to hunt for
- final UUID uuid = UUIDUtils.newTimeUUID();
- final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
- EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
- final Id appId = new SimpleId( "ug_refresh_index" );
- final ApplicationScope appScope = new ApplicationScopeImpl( appId );
- final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() );
- final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
- final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity );
- final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
- //add a tracer record
- IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData );
- //save the item
- final IndexOperationMessage message = new IndexOperationMessage();
- message.addIndexRequest( indexRequest );
-
- //add the record to the index
- final Observable<IndexOperationMessage> addRecord = producer.put( message );
-
- //refresh the index
- // final Observable<Boolean> refresh = refresh( indexes );
-
- /**
- * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search
- */
- //set our filter for entityId fieldname
-
-
- /**
- * We want to search once we've added our record, then refreshed
- */
- final Observable<IndexRefreshCommandInfo> searchObservable =
- Observable.create(sub -> {
- try {
- IndexRefreshCommandInfo info = null;
- for(int i = 0; i<indexFig.maxRefreshSearches();i++) {
- final SearchRequestBuilder builder = esProvider.getClient().prepareSearch(alias.getReadAlias())
- .setTypes(IndexingUtils.ES_ENTITY_TYPE)
- .setPostFilter(FilterBuilders
- .termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
- entityId));
-
- info = new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0,
- System.currentTimeMillis() - start);
- if(info.hasFinished()){
- break;
- }else {
- Thread.sleep(50);
- }
- }
- sub.onNext(info);
- sub.onCompleted();
- } catch (Exception ee) {
- logger.error("Failed during refresh search for " + uuid, ee);
- throw new RuntimeException("Failed during refresh search for " + uuid, ee);
- }
- });
-
-
- //chain it all together
-
- //add the record, take it's last result. On the last add, we then execute the refresh command
-
- final Observable<IndexRefreshCommandInfo> refreshResults = addRecord
-
- //after our add, run a refresh
- .doOnNext( addResult -> {
-
-
- if ( indexes.length == 0 ) {
- logger.debug( "Not refreshing indexes. none found" );
- }
- //Added For Graphite Metrics
- RefreshResponse response =
- esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
- int failedShards = response.getFailedShards();
- int successfulShards = response.getSuccessfulShards();
- ShardOperationFailedException[] sfes = response.getShardFailures();
- if ( sfes != null ) {
- for ( ShardOperationFailedException sfe : sfes ) {
- logger.error( "Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason() );
- }
- }
- logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ),
- successfulShards, failedShards);
- })
-
- //once the refresh is done execute the search
- .flatMap(refreshCommandResult -> searchObservable)
-
- //check when found
- .doOnNext(found -> {
- if (!found.hasFinished()) {
- logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid,
- found.getExecutionTime());
- } else {
- logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
- }
- }).doOnCompleted(() -> {
- //clean up our data
- String[] aliases = indexCache.getIndexes(alias, EntityIndex.AliasType.Read);
- DeIndexOperation deIndexRequest =
- new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
-
- //delete the item
- IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
- indexOperationMessage.addDeIndexRequest( deIndexRequest );
- producer.put( indexOperationMessage ).subscribe();
- } );
-
-
- return ObservableTimer.time( refreshResults, timer ) ;
+ if (indexes.length == 0) {
+ logger.debug("Not refreshing indexes. none found");
+ }
+ //Added For Graphite Metrics
+ RefreshResponse response =
+ esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+ int failedShards = response.getFailedShards();
+ int successfulShards = response.getSuccessfulShards();
+ ShardOperationFailedException[] sfes = response.getShardFailures();
+ if (sfes != null) {
+ for (ShardOperationFailedException sfe : sfes) {
+ logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
+ }
+ }
+ logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "),
+ successfulShards, failedShards);
+
+ IndexRefreshCommandInfo refreshResults = new IndexRefreshCommandInfo(failedShards == 0,
+ System.currentTimeMillis() - start);
+
+ return ObservableTimer.time(Observable.just(refreshResults), timer);
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index f36d3c1..edd3b6b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -37,7 +37,7 @@ public class DefaultQueueManager implements QueueManager {
@Override
public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
List<QueueMessage> returnQueue = new ArrayList<>();
- queue.drainTo(returnQueue);
+ queue.drainTo(returnQueue,1);
return Observable.from( returnQueue);
}