You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/21 22:12:13 UTC

[02/50] usergrid git commit: removed search from refresh need to use queue

removed search from refresh need to use queue


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8614a68
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8614a68
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8614a68

Branch: refs/heads/jacoco
Commit: f8614a68f112469713874f032e40ab9fcc9b24d8
Parents: 2d23aa6
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 9 15:28:33 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 9 15:28:33 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |  43 ++++--
 .../asyncevents/EventBuilderImpl.java           |  10 +-
 .../org/apache/usergrid/CoreApplication.java    |  14 +-
 .../corepersistence/StaleIndexCleanupTest.java  |   8 +-
 .../persistence/ApplicationServiceIT.java       |   9 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 134 +++----------------
 .../persistence/queue/DefaultQueueManager.java  |   2 +-
 7 files changed, 76 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index d46c112..643a2b8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -41,6 +41,11 @@ import org.apache.usergrid.corepersistence.service.ConnectionService;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.index.IndexRefreshCommand;
+import org.apache.usergrid.persistence.index.utils.*;
+import org.apache.usergrid.utils.*;
+import org.apache.usergrid.utils.ClassUtils;
+import org.apache.usergrid.utils.ConversionUtils;
+import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.Assert;
@@ -102,11 +107,6 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.apache.usergrid.persistence.model.field.StringField;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.utils.ClassUtils;
-import org.apache.usergrid.utils.CompositeUtils;
-import org.apache.usergrid.utils.Inflector;
-import org.apache.usergrid.utils.StringUtils;
-import org.apache.usergrid.utils.UUIDUtils;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
@@ -2874,13 +2874,36 @@ public class CpEntityManager implements EntityManager {
      * TODO, these 3 methods are super janky.  During refactoring we should clean this model up
      */
     public IndexRefreshCommand.IndexRefreshCommandInfo refreshIndex() {
+        try {
+            long start = System.currentTimeMillis();
+            // refresh special indexes without calling EntityManager refresh because stack overflow
+            Map<String, Object> map = new org.apache.usergrid.persistence.index.utils.MapUtils.HashMapBuilder<>();
+            map.put("some prop", "test");
+            boolean hasFinished = false;
+            Entity refreshEntity = create("refresh", map);
+            try {
+                for (int i = 0; i < 10; i++) {
+                    if (searchCollection(
+                        new SimpleEntityRef(org.apache.usergrid.persistence.entities.Application.ENTITY_TYPE, getApplicationId()),
+                        InflectionUtils.pluralize("refresh"),
+                        Query.fromQL("select * where uuid='" + refreshEntity.getUuid() + "'")
+                    ).size() > 0
+                        ) {
+                        hasFinished = true;
+                        break;
+                    }
+                    Thread.sleep(250);
+                    return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
+                }
+            }finally {
+                delete(refreshEntity);
+            }
+            return new IndexRefreshCommand.IndexRefreshCommandInfo(hasFinished,System.currentTimeMillis() - start);
+        } catch (Exception e) {
+            throw new RuntimeException("refresh failed",e);
+        }
 
-        // refresh special indexes without calling EntityManager refresh because stack overflow
-
-        return managerCache.getEntityIndex(applicationScope).refreshAsync().toBlocking().first();
     }
-
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4bf5695..bc72207 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -108,10 +108,12 @@ public class EventBuilderImpl implements EventBuilder {
         log.debug( "Deleting in app scope {} with edge {} }", applicationScope, edge );
 
         final Observable<IndexOperationMessage> edgeObservable =
-            indexService.deleteIndexEdge( applicationScope, edge ).flatMap( batch -> {
-                final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-                return gm.deleteEdge( edge ).map( deletedEdge -> batch );
-            } );
+            indexService.deleteIndexEdge( applicationScope, edge )
+                .map( batch -> {
+                    final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+                    gm.deleteEdge(edge).toBlocking().lastOrDefault(null);
+                    return batch;
+                } );
 
         return edgeObservable;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
index 9c96fb8..d207894 100644
--- a/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
+++ b/stack/core/src/test/java/org/apache/usergrid/CoreApplication.java
@@ -17,10 +17,7 @@
 package org.apache.usergrid;
 
 
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 
 import com.google.inject.Injector;
@@ -28,8 +25,10 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.service.ApplicationService;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.utils.MapUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.InflectionUtils;
 import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
@@ -235,14 +234,11 @@ public class CoreApplication implements Application, TestRule {
         //Insert test entity and find it
         setup.getEmf().refreshIndex(CpNamingUtils.getManagementApplicationId().getUuid());
 
-        if(!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
+        if (!em.getApplicationId().equals(CpNamingUtils.getManagementApplicationId().getUuid())) {
             setup.getEmf().refreshIndex(em.getApplicationId());
         }
-        try {
-            Thread.sleep(2000);
-        }catch (Exception e){
 
-        }
+        em.refreshIndex();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 3e46e4f..df93e68 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -321,8 +321,8 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
         final EntityManager em = app.getEntityManager();
 
-        final int numEntities = 20;
-        final int numUpdates = 40;
+        final int numEntities = 5;
+        final int numUpdates = 5;
 
         // create lots of entities
         final List<Entity> things = new ArrayList<Entity>(numEntities);
@@ -348,7 +348,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
 
             for ( int j=0; j<numUpdates; j++) {
                 toUpdate = em.get( thing.getUuid() );
-                toUpdate.setProperty( "property"  + j, RandomStringUtils.randomAlphanumeric(10));
+                toUpdate.setProperty( "property"  + j, UUID.randomUUID().toString());
 
                 em.update(toUpdate);
 
@@ -367,9 +367,7 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
             if(numEntities * (numUpdates + 1) == crs.size()){
                 break;
             }
-            Thread.sleep(250);
             crs = queryCollectionCp("things", "thing", "select *");
-
         }
 
 //        Assert.assertEquals("Expect stale candidates", numEntities * (numUpdates + 1), crs.size());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
index d870114..f8079e5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/ApplicationServiceIT.java
@@ -76,7 +76,6 @@ public class ApplicationServiceIT extends AbstractCoreIT {
         count = ids.count().toBlocking().last();
         Assert.assertEquals(count, 5);
         this.app.refreshIndex();
-        Thread.sleep(5000);
         Injector injector = SpringResource.getInstance().getBean(Injector.class);
         GraphManagerFactory factory = injector.getInstance(GraphManagerFactory.class);
         GraphManager graphManager = factory.createEdgeManager(appScope);
@@ -88,7 +87,13 @@ public class ApplicationServiceIT extends AbstractCoreIT {
 
         Iterator<Edge> results = graphManager.loadEdgesFromSource(simpleSearchByEdgeType).toBlocking().getIterator();
         if(results.hasNext()){
-            Assert.fail("should be empty");
+            int i = 0;
+
+            while(results.hasNext()){
+                results.next();
+                i++;
+            }
+            Assert.fail("should be empty but has "+i);
 
         }else{
             Results searchCollection = entityManager.searchCollection(entityManager.getApplication(), "tests", Query.all());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 087eefe..6b8b024 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -85,118 +85,26 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
 
         final long start = System.currentTimeMillis();
 
-
-        //id to hunt for
-        final UUID uuid = UUIDUtils.newTimeUUID();
-        final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
-        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
-        final Id appId = new SimpleId( "ug_refresh_index" );
-        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
-        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() );
-        final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
-        final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity );
-        final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
-        //add a tracer record
-        IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData );
-        //save the item
-        final IndexOperationMessage message = new IndexOperationMessage();
-        message.addIndexRequest( indexRequest );
-
-        //add the record to the index
-        final Observable<IndexOperationMessage> addRecord = producer.put( message );
-
-        //refresh the index
-        //        final Observable<Boolean> refresh = refresh( indexes );
-
-        /**
-         * We have to search.  Get by ID returns immediately, even if search isn't ready, therefore we have to search
-         */
-        //set our filter for entityId fieldname
-
-
-        /**
-         * We want to search once we've added our record, then refreshed
-         */
-        final Observable<IndexRefreshCommandInfo> searchObservable =
-            Observable.create(sub -> {
-                try {
-                    IndexRefreshCommandInfo info = null;
-                    for(int i = 0; i<indexFig.maxRefreshSearches();i++) {
-                        final SearchRequestBuilder builder = esProvider.getClient().prepareSearch(alias.getReadAlias())
-                            .setTypes(IndexingUtils.ES_ENTITY_TYPE)
-                            .setPostFilter(FilterBuilders
-                                .termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
-                                    entityId));
-
-                        info = new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0,
-                            System.currentTimeMillis() - start);
-                        if(info.hasFinished()){
-                            break;
-                        }else {
-                            Thread.sleep(50);
-                        }
-                    }
-                    sub.onNext(info);
-                    sub.onCompleted();
-                } catch (Exception ee) {
-                    logger.error("Failed during refresh search for " + uuid, ee);
-                    throw new RuntimeException("Failed during refresh search for " + uuid, ee);
-                }
-            });
-
-
-        //chain it all together
-
-        //add the record, take it's last result.  On the last add, we then execute the refresh command
-
-        final Observable<IndexRefreshCommandInfo> refreshResults = addRecord
-
-            //after our add, run a refresh
-            .doOnNext( addResult -> {
-
-
-                if ( indexes.length == 0 ) {
-                    logger.debug( "Not refreshing indexes. none found" );
-                }
-                //Added For Graphite Metrics
-                RefreshResponse response =
-                    esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
-                int failedShards = response.getFailedShards();
-                int successfulShards = response.getSuccessfulShards();
-                ShardOperationFailedException[] sfes = response.getShardFailures();
-                if ( sfes != null ) {
-                    for ( ShardOperationFailedException sfe : sfes ) {
-                        logger.error( "Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason() );
-                    }
-                }
-                logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ),
-                    successfulShards, failedShards);
-            })
-
-                //once the refresh is done execute the search
-            .flatMap(refreshCommandResult -> searchObservable)
-
-                //check when found
-            .doOnNext(found -> {
-                if (!found.hasFinished()) {
-                    logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid,
-                        found.getExecutionTime());
-                } else {
-                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
-                }
-            }).doOnCompleted(() -> {
-                //clean up our data
-                String[] aliases = indexCache.getIndexes(alias, EntityIndex.AliasType.Read);
-                DeIndexOperation deIndexRequest =
-                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
-
-                //delete the item
-                IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
-                indexOperationMessage.addDeIndexRequest( deIndexRequest );
-                producer.put( indexOperationMessage ).subscribe();
-            } );
-
-
-        return ObservableTimer.time( refreshResults, timer ) ;
+        if (indexes.length == 0) {
+            logger.debug("Not refreshing indexes. none found");
+        }
+        //Added For Graphite Metrics
+        RefreshResponse response =
+            esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+        int failedShards = response.getFailedShards();
+        int successfulShards = response.getSuccessfulShards();
+        ShardOperationFailedException[] sfes = response.getShardFailures();
+        if (sfes != null) {
+            for (ShardOperationFailedException sfe : sfes) {
+                logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
+            }
+        }
+        logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "),
+            successfulShards, failedShards);
+
+        IndexRefreshCommandInfo refreshResults = new IndexRefreshCommandInfo(failedShards == 0,
+            System.currentTimeMillis() - start);
+
+        return ObservableTimer.time(Observable.just(refreshResults), timer);
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8614a68/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index f36d3c1..edd3b6b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -37,7 +37,7 @@ public class DefaultQueueManager implements QueueManager {
     @Override
     public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
-        queue.drainTo(returnQueue);
+        queue.drainTo(returnQueue,1);
         return Observable.from( returnQueue);
     }