You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/04 23:42:33 UTC
incubator-usergrid git commit: Updated index refresh to be more
procedural. Still a race condition on index processing and flush.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 9b939d15b -> 1e3de5725
Updated index refresh to be more procedural. Still a race condition on index processing and flush.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1e3de572
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1e3de572
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1e3de572
Branch: refs/heads/two-dot-o-dev
Commit: 1e3de5725ce2fff16e44a4b1b068269d197411eb
Parents: 9b939d1
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 4 15:42:22 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon May 4 15:42:22 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 2 -
.../usergrid/persistence/CollectionIT.java | 2 +
.../core/future/FutureObservable.java | 6 +-
.../persistence/index/IndexRefreshCommand.java | 2 +-
.../index/impl/EsEntityIndexImpl.java | 2 -
.../index/impl/EsIndexBufferConsumerImpl.java | 13 +-
.../index/impl/IndexBufferConsumer.java | 2 +-
.../index/impl/IndexRefreshCommandImpl.java | 170 +++++++++----------
8 files changed, 94 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6ffefe3..7d003cc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -579,8 +579,6 @@ public class CpEntityManager implements EntityManager {
// first, update entity index in its own collection scope
updateEntityMeter.mark();
- Timer.Context timer = updateEntityTimer.time();
-
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index fcdc11f..8c94d32 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -363,6 +363,8 @@ public class CollectionIT extends AbstractCoreIT {
app.refreshIndex();
+// Thread.sleep(500);
+
final Query query = Query.fromQL( "nickname = 'ed'" );
Results r = em.searchCollection( group, "users", query.withResultsLevel( Level.LINKED_PROPERTIES ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
index f86ffc1..06eed4d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/future/FutureObservable.java
@@ -27,13 +27,11 @@ import java.util.concurrent.FutureTask;
*/
public class FutureObservable<T> {
- private final T returnVal;
private final FutureTask<T> future;
public FutureObservable(final T returnVal) {
- this.returnVal = returnVal;
- future = new FutureTask<T>( () -> returnVal );
+ future = new FutureTask<>( () -> returnVal );
}
public void done() {
@@ -41,6 +39,6 @@ public class FutureObservable<T> {
}
public Observable<T> observable() {
- return !future.isDone() ? Observable.from(future) : Observable.just(returnVal);
+ return Observable.from(future);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
index 03be233..af7f814 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
@@ -28,7 +28,7 @@ public interface IndexRefreshCommand {
Observable<IndexRefreshCommandInfo> execute(String[] indexes);
- public static class IndexRefreshCommandInfo{
+ class IndexRefreshCommandInfo{
private final boolean hasFinished;
private final long executionTime;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 20a940f..904f58b 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -323,8 +323,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
public Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync() {
refreshIndexMeter.mark();
- Observable future = producer.put(new IndexOperationMessage());
- future.toBlocking().last();
return indexRefreshCommand.execute(getUniqueIndexes());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index b2433bd..f1c493a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -108,7 +108,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
}
- public Observable put( IndexOperationMessage message ) {
+ public Observable<IndexOperationMessage> put( IndexOperationMessage message ) {
Preconditions.checkNotNull(message, "Message cannot be null");
indexSizeCounter.inc( message.getDeIndexRequests().size() );
indexSizeCounter.inc( message.getIndexRequests().size() );
@@ -129,7 +129,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//buffer on our new thread with a timeout
observable.buffer( indexFig.getIndexBufferSize(), indexFig.getIndexBufferTimeout(), TimeUnit.MILLISECONDS,
- Schedulers.newThread() ).flatMap( indexOpBuffer -> {
+ Schedulers.io() ).flatMap( indexOpBuffer -> {
//hand off to processor in new observable thread so we can continue to buffer faster
return Observable.just( indexOpBuffer ).flatMap(
@@ -199,13 +199,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//subscribe to the operations that generate requests on a new thread so that we can execute them quickly
//mark this as done
- return processedIndexOperations.doOnNext( processedIndexOp ->
- {
- processedIndexOp.done();
- }
- ).doOnError(t -> {
- log.error("Unable to ack futures", t);
- });
+ return processedIndexOperations.doOnNext( processedIndexOp -> processedIndexOp.done()
+ ).doOnError(t -> log.error("Unable to ack futures", t) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
index 6898f15..2906570 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBufferConsumer.java
@@ -34,5 +34,5 @@ public interface IndexBufferConsumer {
* @param message
* @return
*/
- Observable put(IndexOperationMessage message);
+ Observable<IndexOperationMessage> put(IndexOperationMessage message);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1e3de572/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 71a05a0..34f0e6e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -20,26 +20,21 @@
package org.apache.usergrid.persistence.index.impl;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import com.amazonaws.services.elastictranscoder.model.TimeSpan;
-import org.apache.usergrid.persistence.core.util.StringUtils;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.indices.IndexMissingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.util.StringUtils;
import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.IndexEdge;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -56,7 +51,6 @@ import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import rx.Observable;
-import rx.Subscriber;
import rx.util.async.Async;
@@ -72,13 +66,12 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
private final IndexBufferConsumer producer;
private final IndexFig indexFig;
private final Timer timer;
- private final RxTaskScheduler rxTaskScheduler;
+
@Inject
public IndexRefreshCommandImpl( IndexIdentifier indexIdentifier, EsProvider esProvider,
IndexBufferConsumer producer, IndexFig indexFig, MetricsFactory metricsFactory,
- final IndexCache indexCache, final RxTaskScheduler rxTaskScheduler ) {
-
+ final IndexCache indexCache ) {
this.timer = metricsFactory.getTimer( IndexRefreshCommandImpl.class, "index.refresh.timer" );
@@ -87,17 +80,16 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
this.producer = producer;
this.indexFig = indexFig;
this.indexCache = indexCache;
- this.rxTaskScheduler = rxTaskScheduler;
}
@Override
- public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
+ public Observable<IndexRefreshCommandInfo> execute( String[] indexes ) {
final long start = System.currentTimeMillis();
- Timer.Context refreshTimer = timer.time();
+
//id to hunt for
final UUID uuid = UUIDUtils.newTimeUUID();
final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
@@ -113,90 +105,96 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
//save the item
final IndexOperationMessage message = new IndexOperationMessage();
message.addIndexRequest( indexRequest );
- final Observable addRecord = producer.put(message);
- final Observable refresh = refresh(indexes);
+
+ //add the record to the index
+ final Observable<IndexOperationMessage> addRecord = producer.put( message );
+
+ //refresh the index
+ // final Observable<Boolean> refresh = refresh( indexes );
/**
* We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search
*/
//set our filter for entityId fieldname
- final SearchRequestBuilder builder =
- esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
- .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
-
-
- //start our processing immediately
- final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
- final Observable<IndexRefreshCommandInfo> infoObservable = Observable
- .range(0, indexFig.maxRefreshSearches())
- .map(i ->
- {
- try {
- return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start);
- } catch (Exception ee) {
- logger.error("Failed during refresh search for " + uuid, ee);
- throw new RuntimeException("Failed during refresh search for " + uuid, ee);
- }
- })
- .takeWhile(info -> info.hasFinished())
- .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS);
-
- final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
- combined.toBlocking().last();
- final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
- return info;
- },rxTaskScheduler.getAsyncIOScheduler()).call();
-
-
- return future.doOnNext(found -> {
- if (!found.hasFinished()) {
- logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
- } else {
- logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
+
+
+ /**
+ * We want to search once we've added our record, then refreshed
+ */
+ final Observable<IndexRefreshCommandInfo> searchObservable =
+ Observable.range( 0, indexFig.maxRefreshSearches() ).map( i -> {
+ try {
+
+ final SearchRequestBuilder builder = esProvider.getClient().prepareSearch( alias.getReadAlias() )
+ .setTypes( IndexingUtils.ES_ENTITY_TYPE )
+ .setPostFilter( FilterBuilders
+ .termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+ entityId ) );
+
+
+ return new IndexRefreshCommandInfo( builder.execute().get().getHits().totalHits() > 0,
+ System.currentTimeMillis() - start );
}
- }).doOnCompleted(() -> {
- //clean up our data
- String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
- DeIndexOperation deIndexRequest =
- new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
+ catch ( Exception ee ) {
+ logger.error( "Failed during refresh search for " + uuid, ee );
+ throw new RuntimeException( "Failed during refresh search for " + uuid, ee );
+ }
+ } ).skipWhile( info -> !info.hasFinished() );
- //delete the item
- IndexOperationMessage indexOperationMessage =
- new IndexOperationMessage();
- indexOperationMessage.addDeIndexRequest(deIndexRequest);
- producer.put(indexOperationMessage);
- refreshTimer.stop();
- });
- }
+ //chain it all together
- private Observable<Boolean> refresh(final String[] indexes) {
+ //add the record, take it's last result. On the last add, we then execute the refresh command
- return Observable.create(subscriber -> {
- try {
+ final Observable<IndexRefreshCommandInfo> refreshResults = addRecord
+
+ //after our add, run a refresh
+ .doOnNext( addResult -> {
- if (indexes.length == 0) {
- logger.debug("Not refreshing indexes. none found");
- }
- //Added For Graphite Metrics
- RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
- int failedShards = response.getFailedShards();
- int successfulShards = response.getSuccessfulShards();
- ShardOperationFailedException[] sfes = response.getShardFailures();
- if (sfes != null) {
- for (ShardOperationFailedException sfe : sfes) {
- logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
- }
- }
- logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards);
- } catch (IndexMissingException e) {
- logger.error("Unable to refresh index. Waiting before sleeping.", e);
- throw e;
+ if ( indexes.length == 0 ) {
+ logger.debug( "Not refreshing indexes. none found" );
+ }
+ //Added For Graphite Metrics
+ RefreshResponse response =
+ esProvider.getClient().admin().indices().prepareRefresh( indexes ).execute().actionGet();
+ int failedShards = response.getFailedShards();
+ int successfulShards = response.getSuccessfulShards();
+ ShardOperationFailedException[] sfes = response.getShardFailures();
+ if ( sfes != null ) {
+ for ( ShardOperationFailedException sfe : sfes ) {
+ logger.error( "Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason() );
+ }
}
- subscriber.onNext(true);
- subscriber.onCompleted();
- });
+ logger.debug( "Refreshed indexes: {},success:{} failed:{} ", StringUtils.join( indexes, ", " ),
+ successfulShards, failedShards );
+ } )
+
+ //once the refresh is done execute the search
+ .flatMap( refreshCommandResult -> searchObservable )
+
+ //check when found
+ .doOnNext( found -> {
+ if ( !found.hasFinished() ) {
+ logger.error( "Couldn't find record during refresh uuid: {} took ms:{} ", uuid,
+ found.getExecutionTime() );
+ }
+ else {
+ logger.info( "found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime() );
+ }
+ } ).doOnCompleted( () -> {
+ //clean up our data
+ String[] aliases = indexCache.getIndexes( alias, AliasedEntityIndex.AliasType.Read );
+ DeIndexOperation deIndexRequest =
+ new DeIndexOperation( aliases, appScope, edge, entity.getId(), entity.getVersion() );
- }
+ //delete the item
+ IndexOperationMessage indexOperationMessage = new IndexOperationMessage();
+ indexOperationMessage.addDeIndexRequest( deIndexRequest );
+ producer.put( indexOperationMessage );
+ } );
+
+
+ return Async.start( () -> 1 ).flatMap( intValue -> ObservableTimer.time( refreshResults, timer ) );
+ }
}