You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/04 23:42:33 UTC

incubator-usergrid git commit: Updated index refresh to be more procedural. Still a race condition on index processing and flush.

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 9b939d15b -> 1e3de5725


Updated index refresh to be more procedural.  Still a race condition on index processing and flush.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1e3de572
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1e3de572
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1e3de572

Branch: refs/heads/two-dot-o-dev
Commit: 1e3de5725ce2fff16e44a4b1b068269d197411eb
Parents: 9b939d1
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 15:42:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 15:42:22 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        |   2 -
 .../usergrid/persistence/CollectionIT.java      |   2 +
 .../core/future/FutureObservable.java           |   6 +-
 .../persistence/index/IndexRefreshCommand.java  |   2 +-
 .../index/impl/EsEntityIndexImpl.java           |   2 -
 .../index/impl/EsIndexBufferConsumerImpl.java   |  13 +-
 .../index/impl/IndexBufferConsumer.java         |   2 +-
 .../index/impl/IndexRefreshCommandImpl.java     | 170 +++++++++----------
 8 files changed, 94 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6ffefe3..7d003cc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -579,8 +579,6 @@ public class CpEntityManager implements EntityManager {
         // first, update entity index in its own collection scope
 
         updateEntityMeter.mark();
-        Timer.Context timer = updateEntityTimer.time();
-
 
 
         Id entityId = new SimpleId( entity.getUuid(), entity.getType() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index fcdc11f..8c94d32 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -363,6 +363,8 @@ public class CollectionIT extends AbstractCoreIT {
 
         app.refreshIndex();
 
+//        Thread.sleep(500);
+
         final Query query = Query.fromQL( "nickname = 'ed'" );
 
         Results r = em.searchCollection( group, "users", query.withResultsLevel( Level.LINKED_PROPERTIES ) );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
index f86ffc1..06eed4d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
@@ -27,13 +27,11 @@ import java.util.concurrent.FutureTask;
  */
 public class FutureObservable<T> {
 
-    private final T returnVal;
     private final FutureTask<T> future;
 
 
     public FutureObservable(final T returnVal) {
-        this.returnVal = returnVal;
-        future = new FutureTask<T>( () -> returnVal );
+        future = new FutureTask<>( () -> returnVal );
     }
 
     public void done() {
@@ -41,6 +39,6 @@ public class FutureObservable<T> {
     }
 
     public Observable<T> observable() {
-        return !future.isDone() ? Observable.from(future) : Observable.just(returnVal);
+        return  Observable.from(future);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
index 03be233..af7f814 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
@@ -28,7 +28,7 @@ public interface IndexRefreshCommand {
 
     Observable<IndexRefreshCommandInfo> execute(String[] indexes);
 
-    public static class IndexRefreshCommandInfo{
+    class IndexRefreshCommandInfo{
         private final boolean hasFinished;
         private final long executionTime;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 20a940f..904f58b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -323,8 +323,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
     public Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync() {
 
         refreshIndexMeter.mark();
-        Observable future = producer.put(new IndexOperationMessage());
-        future.toBlocking().last();
         return indexRefreshCommand.execute(getUniqueIndexes());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index b2433bd..f1c493a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -108,7 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
     }
 
 
-    public Observable put( IndexOperationMessage message ) {
+    public Observable<IndexOperationMessage>  put( IndexOperationMessage message ) {
         Preconditions.checkNotNull(message, "Message cannot be null");
         indexSizeCounter.inc( message.getDeIndexRequests().size() );
         indexSizeCounter.inc( message.getIndexRequests().size() );
@@ -129,7 +129,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         //buffer on our new thread with a timeout
         observable.buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
-            Schedulers.newThread() ).flatMap( indexOpBuffer -> {
+            Schedulers.io() ).flatMap( indexOpBuffer -> {
 
             //hand off to processor in new observable thread so we can continue to buffer faster
             return Observable.just( indexOpBuffer ).flatMap(
@@ -199,13 +199,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
 
         //subscribe to the operations that generate requests on a new thread so that we can execute them quickly
         //mark this as done
-        return processedIndexOperations.doOnNext( processedIndexOp ->
-            {
-                processedIndexOp.done();
-            }
-        ).doOnError(t -> {
-            log.error("Unable to ack futures", t);
-        });
+        return processedIndexOperations.doOnNext( processedIndexOp -> processedIndexOp.done()
+        ).doOnError(t -> log.error("Unable to ack futures", t) );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index 6898f15..2906570 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -34,5 +34,5 @@ public interface IndexBufferConsumer {
      * @param message
      * @return
      */
-    Observable put(IndexOperationMessage message);
+    Observable<IndexOperationMessage>  put(IndexOperationMessage message);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 71a05a0..34f0e6e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,26 +20,21 @@
 package org.apache.usergrid.persistence.index.impl;
 
 
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
-import com.amazonaws.services.elastictranscoder.model.TimeSpan;
-import org.apache.usergrid.persistence.core.util.StringUtils;
 import org.elasticsearch.action.ShardOperationFailedException;
 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.indices.IndexMissingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.util.StringUtils;
 import org.apache.usergrid.persistence.index.AliasedEntityIndex;
 import org.apache.usergrid.persistence.index.IndexEdge;
 import org.apache.usergrid.persistence.index.IndexFig;
@@ -56,7 +51,6 @@ import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 
 import rx.Observable;
-import rx.Subscriber;
 import rx.util.async.Async;
 
 
@@ -72,13 +66,12 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
     private final IndexBufferConsumer producer;
     private final IndexFig indexFig;
     private final Timer timer;
-    private final RxTaskScheduler rxTaskScheduler;
+
 
     @Inject
     public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider,
                                     IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory metricsFactory,
-                                    final IndexCache indexCache, final RxTaskScheduler rxTaskScheduler ) {
-
+                                    final IndexCache indexCache ) {
 
 
         this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, "index.refresh.timer" );
@@ -87,17 +80,16 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
         this.producer = producer;
         this.indexFig = indexFig;
         this.indexCache = indexCache;
-        this.rxTaskScheduler = rxTaskScheduler;
     }
 
 
     @Override
-    public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
+    public Observable<IndexRefreshCommandInfo> execute( String[] indexes ) {
 
 
         final long start = System.currentTimeMillis();
 
-        Timer.Context refreshTimer = timer.time();
+
         //id to hunt for
         final UUID uuid = UUIDUtils.newTimeUUID();
         final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
@@ -113,90 +105,96 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
         //save the item
         final IndexOperationMessage message = new IndexOperationMessage();
         message.addIndexRequest( indexRequest );
-        final Observable addRecord = producer.put(message);
-        final Observable refresh = refresh(indexes);
+
+        //add the record to the index
+        final Observable<IndexOperationMessage> addRecord = producer.put( message );
+
+        //refresh the index
+        //        final Observable<Boolean> refresh = refresh( indexes );
 
         /**
          * We have to search.  Get by ID returns immediately, even if search isn't ready, therefore we have to search
          */
         //set our filter for entityId fieldname
-        final SearchRequestBuilder builder =
-            esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
-                .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
-
-
-        //start our processing immediately
-        final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
-            final Observable<IndexRefreshCommandInfo> infoObservable = Observable
-                .range(0, indexFig.maxRefreshSearches())
-                .map(i ->
-                {
-                    try {
-                        return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start);
-                    } catch (Exception ee) {
-                        logger.error("Failed during refresh search for " + uuid, ee);
-                        throw new RuntimeException("Failed during refresh search for " + uuid, ee);
-                    }
-                })
-                .takeWhile(info -> info.hasFinished())
-                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS);
-
-            final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
-            combined.toBlocking().last();
-            final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
-            return info;
-        },rxTaskScheduler.getAsyncIOScheduler()).call();
-
-
-            return future.doOnNext(found -> {
-                if (!found.hasFinished()) {
-                    logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
-                } else {
-                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
+
+
+        /**
+         * We want to search once we've added our record, then refreshed
+         */
+        final Observable<IndexRefreshCommandInfo> searchObservable =
+            Observable.range( 0, indexFig.maxRefreshSearches() ).map( i -> {
+                try {
+
+                    final SearchRequestBuilder builder = esProvider.getClient().prepareSearch( alias.getReadAlias() )
+                                                                   .setTypes( IndexingUtils.ES_ENTITY_TYPE )
+                                                                   .setPostFilter( FilterBuilders
+                                                                       .termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+                                                                           entityId ) );
+
+
+                    return new IndexRefreshCommandInfo( builder.execute().get().getHits().totalHits() > 0,
+                        System.currentTimeMillis() - start );
                 }
-            }).doOnCompleted(() -> {
-                //clean up our data
-                String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
-                DeIndexOperation deIndexRequest =
-                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
+                catch ( Exception ee ) {
+                    logger.error( "Failed during refresh search for " + uuid, ee );
+                    throw new RuntimeException( "Failed during refresh search for " + uuid, ee );
+                }
+            } ).skipWhile( info -> !info.hasFinished() );
 
-                //delete the item
-                IndexOperationMessage indexOperationMessage =
-                    new IndexOperationMessage();
-                indexOperationMessage.addDeIndexRequest(deIndexRequest);
-                producer.put(indexOperationMessage);
 
-                refreshTimer.stop();
-            });
-        }
+        //chain it all together
 
-        private Observable<Boolean> refresh(final String[] indexes) {
+        //add the record, take it's last result.  On the last add, we then execute the refresh command
 
-            return Observable.create(subscriber -> {
-                try {
+        final Observable<IndexRefreshCommandInfo> refreshResults = addRecord
+
+            //after our add, run a refresh
+            .doOnNext( addResult -> {
 
-                    if (indexes.length == 0) {
-                        logger.debug("Not refreshing indexes. none found");
-                    }
-                    //Added For Graphite Metrics
-                    RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
-                    int failedShards = response.getFailedShards();
-                    int successfulShards = response.getSuccessfulShards();
-                    ShardOperationFailedException[] sfes = response.getShardFailures();
-                    if (sfes != null) {
-                        for (ShardOperationFailedException sfe : sfes) {
-                            logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
-                        }
-                    }
-                    logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards);
 
-                } catch (IndexMissingException e) {
-                    logger.error("Unable to refresh index. Waiting before sleeping.", e);
-                    throw e;
+                if ( indexes.length == 0 ) {
+                    logger.debug( "Not refreshing indexes. none found" );
+                }
+                //Added For Graphite Metrics
+                RefreshResponse response =
+                    esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
+                int failedShards = response.getFailedShards();
+                int successfulShards = response.getSuccessfulShards();
+                ShardOperationFailedException[] sfes = response.getShardFailures();
+                if ( sfes != null ) {
+                    for ( ShardOperationFailedException sfe : sfes ) {
+                        logger.error( "Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason() );
+                    }
                 }
-                subscriber.onNext(true);
-                subscriber.onCompleted();
-            });
+                logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ),
+                    successfulShards, failedShards );
+            } )
+
+                //once the refresh is done execute the search
+            .flatMap( refreshCommandResult -> searchObservable )
+
+                //check when found
+            .doOnNext( found -> {
+                if ( !found.hasFinished() ) {
+                    logger.error( "Couldn't find record during refresh uuid: {} took ms:{} ", uuid,
+                        found.getExecutionTime() );
+                }
+                else {
+                    logger.info( "found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime() );
+                }
+            } ).doOnCompleted( () -> {
+                //clean up our data
+                String[] aliases = indexCache.getIndexes( alias, AliasedEntityIndex.AliasType.Read );
+                DeIndexOperation deIndexRequest =
+                    new DeIndexOperation( aliases, appScope, edge, entity.getId(), entity.getVersion() );
 
-        }
+                //delete the item
+                IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+                indexOperationMessage.addDeIndexRequest( deIndexRequest );
+                producer.put( indexOperationMessage );
+            } );
+
+
+        return Async.start( () -> 1 ).flatMap( intValue -> ObservableTimer.time( refreshResults, timer ) );
+    }
 }