You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/02/25 20:56:33 UTC
[04/10] usergrid git commit: Reduce SQS hop for entity write/update
indexing events.
Reduce SQS hop for entity write/update indexing events.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b4634dc4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b4634dc4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b4634dc4
Branch: refs/heads/release-2.1.1
Commit: b4634dc42f767a982892362bbd4aa66059bf1998
Parents: d4c7a3c
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Feb 19 13:35:48 2016 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Feb 19 13:35:48 2016 -0800
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 4 +-
.../asyncevents/AmazonAsyncEventService.java | 844 -------------------
.../asyncevents/AsyncEventService.java | 5 +-
.../asyncevents/AsyncEventServiceImpl.java | 798 ++++++++++++++++++
.../asyncevents/AsyncIndexProvider.java | 4 +-
.../asyncevents/EventBuilder.java | 12 +-
.../asyncevents/EventBuilderImpl.java | 15 +-
.../asyncevents/IndexDocNotFoundException.java | 37 +
.../corepersistence/index/IndexServiceImpl.java | 4 +-
.../read/search/CandidateEntityFilter.java | 10 +-
.../index/AmazonAsyncEventServiceTest.java | 103 ---
.../index/AsyncEventServiceImplTest.java | 103 +++
.../index/AsyncIndexServiceTest.java | 3 +-
.../index/impl/EsIndexProducerImpl.java | 5 +-
.../usergrid/persistence/queue/QueueFig.java | 4 +
.../usergrid/services/ServiceManager.java | 5 +-
16 files changed, 972 insertions(+), 984 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 6c2ef0b..b677f79 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -590,7 +590,7 @@ public class CpEntityManager implements EntityManager {
// update in all containing collections and connection indexes
- indexService.queueEntityIndexUpdate( applicationScope, cpEntity );
+ indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0);
}
@@ -1107,7 +1107,7 @@ public class CpEntityManager implements EntityManager {
//Adding graphite metrics
- indexService.queueEntityIndexUpdate(applicationScope, cpEntity);
+ indexService.queueEntityIndexUpdate(applicationScope, cpEntity, 0);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
deleted file mode 100644
index 00dc69a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.usergrid.persistence.index.impl.*;
-import org.elasticsearch.action.index.IndexRequest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
-import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.schedulers.Schedulers;
-
-
-/**
- * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner.
- *
- * 1. Produce. Keep the code in the handle as is
- * 2. Consume: Move the code into a refactored system
- * 2.1 A central dispatcher
- * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own
- * impl that will then emit a stream of batch operations to perform
- * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler
- * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
- * 2.5 The receive batch handler will execute the batch operations
- *
- * TODO determine how we error handle?
- *
- */
-@Singleton
-public class AmazonAsyncEventService implements AsyncEventService {
-
-
- private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
-
- // SQS maximum receive messages is 10
- public int MAX_TAKE = 10;
- public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
-
- private final QueueManager queue;
- private final IndexProcessorFig indexProcessorFig;
- private final QueueFig queueFig;
- private final IndexProducer indexProducer;
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final IndexLocationStrategyFactory indexLocationStrategyFactory;
- private final EntityIndexFactory entityIndexFactory;
- private final EventBuilder eventBuilder;
- private final RxTaskScheduler rxTaskScheduler;
-
- private final Timer readTimer;
- private final Timer writeTimer;
- private final Timer ackTimer;
-
- /**
- * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
- */
- private final Object mutex = new Object();
-
- private final Counter indexErrorCounter;
- private final AtomicLong counter = new AtomicLong();
- private final AtomicLong inFlight = new AtomicLong();
- private final Histogram messageCycle;
- private final MapManager esMapPersistence;
-
- //the actively running subscription
- private List<Subscription> subscriptions = new ArrayList<>();
-
-
- @Inject
- public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
- final IndexProcessorFig indexProcessorFig,
- final IndexProducer indexProducer,
- final MetricsFactory metricsFactory,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory,
- final EventBuilder eventBuilder,
- final MapManagerFactory mapManagerFactory,
- final QueueFig queueFig,
- @EventExecutionScheduler
- final RxTaskScheduler rxTaskScheduler ) {
- this.indexProducer = indexProducer;
-
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.indexLocationStrategyFactory = indexLocationStrategyFactory;
- this.entityIndexFactory = entityIndexFactory;
- this.eventBuilder = eventBuilder;
-
- final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
-
- this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
-
- this.rxTaskScheduler = rxTaskScheduler;
-
- QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
- this.queue = queueManagerFactory.getQueueManager(queueScope);
-
- this.indexProcessorFig = indexProcessorFig;
- this.queueFig = queueFig;
-
- this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
- this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
- this.ackTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.ack");
- this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
- this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");
-
-
- //wire up the gauge of inflight message
- metricsFactory.addGauge(AmazonAsyncEventService.class, "async-event.inflight", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return inFlight.longValue();
- }
- });
-
- start();
- }
-
-
- /**
- * Offer the EntityIdScope to SQS
- */
- private void offer(final Serializable operation) {
- final Timer.Context timer = this.writeTimer.time();
-
- try {
- //signal to SQS
- this.queue.sendMessage( operation );
- } catch (IOException e) {
- throw new RuntimeException("Unable to queue message", e);
- } finally {
- timer.stop();
- }
- }
-
-
- private void offerTopic( final Serializable operation ) {
- final Timer.Context timer = this.writeTimer.time();
-
- try {
- //signal to SQS
- this.queue.sendMessageToTopic( operation );
- }
- catch ( IOException e ) {
- throw new RuntimeException( "Unable to queue message", e );
- }
- finally {
- timer.stop();
- }
- }
-
- private void offerBatch(final List operations){
- final Timer.Context timer = this.writeTimer.time();
-
- try {
- //signal to SQS
- this.queue.sendMessages(operations);
- } catch (IOException e) {
- throw new RuntimeException("Unable to queue message", e);
- } finally {
- timer.stop();
- }
- }
-
-
- /**
- * Take message from SQS
- */
- private List<QueueMessage> take() {
-
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return queue.getMessages(MAX_TAKE,
- indexProcessorFig.getIndexQueueVisibilityTimeout(),
- indexProcessorFig.getIndexQueueTimeout(),
- AsyncEvent.class);
- }
- //stop our timer
- finally {
- timer.stop();
- }
- }
-
-
-
- /**
- * Ack message in SQS
- */
- public void ack(final List<QueueMessage> messages) {
-
- final Timer.Context timer = this.ackTimer.time();
-
- try{
- queue.commitMessages( messages );
-
- //decrement our in-flight counter
- inFlight.decrementAndGet();
-
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
- }finally {
- timer.stop();
- }
-
-
- }
-
- /**
- * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
- * @param messages
- * @return
- */
- private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
-
- if (logger.isDebugEnabled()) {
- logger.debug("callEventHandlers with {} message", messages.size());
- }
-
- Stream<IndexEventResult> indexEventResults = messages.stream().map(message -> {
- AsyncEvent event = null;
- try {
- event = (AsyncEvent) message.getBody();
- } catch (ClassCastException cce) {
- logger.error("Failed to deserialize message body", cce);
- }
-
- if (event == null) {
- logger.error("AsyncEvent type or event is null!");
- return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(),
- System.currentTimeMillis());
- }
-
- final AsyncEvent thisEvent = event;
-
- if (logger.isDebugEnabled()) {
- logger.debug("Processing {} event", event);
- }
-
- try {
- //check for empty sets if this is true
- boolean validateEmptySets = true;
- Observable<IndexOperationMessage> indexoperationObservable;
- //merge each operation to a master observable;
- if ( event instanceof EdgeDeleteEvent ) {
- indexoperationObservable = handleEdgeDelete( message );
- }
- else if ( event instanceof EdgeIndexEvent ) {
- indexoperationObservable = handleEdgeIndex( message );
- }
- else if ( event instanceof EntityDeleteEvent ) {
- indexoperationObservable = handleEntityDelete( message );
- validateEmptySets = false; // do not check this one for an empty set b/c it can be empty
-
- }
- else if ( event instanceof EntityIndexEvent ) {
- indexoperationObservable = handleEntityIndexUpdate( message );
- }
- else if ( event instanceof InitializeApplicationIndexEvent ) {
- //does not return observable
- handleInitializeApplicationIndex(event, message);
- indexoperationObservable = Observable.just(new IndexOperationMessage());
- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- } else if (event instanceof ElasticsearchIndexEvent) {
- handleIndexOperation((ElasticsearchIndexEvent) event);
- indexoperationObservable = Observable.just(new IndexOperationMessage());
- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- } else {
- throw new Exception("Unknown EventType");//TODO: print json instead
- }
-
- //collect all of the
- IndexOperationMessage indexOperationMessage = indexoperationObservable
- .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
- .toBlocking().lastOrDefault(null);
-
- if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
- logger.error("Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
- message.getStringBody());
- throw new Exception("Received empty index sequence.");
- }
-
- //return type that can be indexed and ack'd later
- return new IndexEventResult(Optional.fromNullable(message),
- Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
- } catch (Exception e) {
- logger.error("Failed to index message: {} {}", message.getMessageId(), message.getStringBody(), e);
- return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(),
- event.getCreationTime());
- }
- });
-
-
- return indexEventResults.collect(Collectors.toList());
- }
-
- @Override
- public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
- IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
- applicationScope);
- offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
- new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
- }
-
-
- @Override
- public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
- final Entity entity) {
-
- offer(new EntityIndexEvent(queueFig.getPrimaryRegion(),new EntityIdScope(applicationScope, entity.getId()), 0));
- }
-
-
- public Observable<IndexOperationMessage> handleEntityIndexUpdate(final QueueMessage message) {
-
- Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
-
- final AsyncEvent event = ( AsyncEvent ) message.getBody();
-
- Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
- Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass()));
-
- final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event;
-
-
- //process the entity immediately
- //only process the same version, otherwise ignore
- final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope();
- final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
- final Id entityId = entityIdScope.getId();
- final long updatedAfter = entityIndexEvent.getUpdatedAfter();
-
- final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter);
-
- final Observable<IndexOperationMessage> observable = eventBuilder.buildEntityIndex( entityIndexOperation );
- return observable;
- }
-
-
- @Override
- public void queueNewEdge(final ApplicationScope applicationScope,
- final Entity entity,
- final Edge newEdge) {
-
- EdgeIndexEvent operation = new EdgeIndexEvent(queueFig.getPrimaryRegion(), applicationScope, entity.getId(), newEdge);
-
- offer( operation );
- }
-
- public Observable<IndexOperationMessage> handleEdgeIndex(final QueueMessage message) {
-
- Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
-
- final AsyncEvent event = (AsyncEvent) message.getBody();
-
- Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" );
- Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass()));
-
- final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event;
-
- final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope();
- final Edge edge = edgeIndexEvent.getEdge();
-
-
-
- final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
- entity -> eventBuilder.buildNewEdge(applicationScope, entity, edge));
- return edgeIndexObservable;
- }
-
- @Override
- public void queueDeleteEdge(final ApplicationScope applicationScope,
- final Edge edge) {
-
- offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
- }
-
- public Observable<IndexOperationMessage> handleEdgeDelete(final QueueMessage message) {
-
- Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
-
- final AsyncEvent event = (AsyncEvent) message.getBody();
-
- Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
- Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
-
-
- final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
-
- final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
- final Edge edge = edgeDeleteEvent.getEdge();
-
- if (logger.isDebugEnabled()) {
- logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
- }
-
- return eventBuilder.buildDeleteEdge(applicationScope, edge);
- }
-
-
- @Override
- public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
-
- offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
- }
-
-
- /**
- * Queue up an indexOperationMessage for multi region execution
- * @param indexOperationMessage
- */
- public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
-
- // don't try to produce something with nothing
- if(indexOperationMessage.isEmpty()){
- return;
- }
-
- final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
-
- final UUID newMessageId = UUIDGenerator.newTimeUUID();
-
- final int expirationTimeInSeconds =
- ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
-
- //write to the map in ES
- esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
-
-
-
- //now queue up the index message
-
- final ElasticsearchIndexEvent elasticsearchIndexEvent =
- new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
-
- //send to the topic so all regions index the batch
-
- offerTopic( elasticsearchIndexEvent );
- }
-
- public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
- Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
-
- final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
-
- Preconditions.checkNotNull( messageId, "messageId must not be null" );
-
-
- //load the entity
-
- final String message = esMapPersistence.getString( messageId.toString() );
-
- final IndexOperationMessage indexOperationMessage;
-
- if(message == null){
- logger.warn( "Received message with id {} to process, unable to find it, reading with higher consistency level",
- messageId);
-
- final String highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
-
- if(highConsistency == null){
- logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level",
- messageId);
-
- throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
- }
-
- indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
-
- } else{
- indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
- }
-
- initializeEntityIndexes(indexOperationMessage);
-
- //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message
- //so we'll let compaction on column expiration handle deletion
-
- //read the value from the string
-
- Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
- Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
-
-
- //now execute it
- indexProducer.put(indexOperationMessage).toBlocking().last();
-
- }
-
- /**
- * this method will call initialize for each message, since we are caching the entity indexes,
- * we don't worry about aggregating by app id
- * @param indexOperationMessage
- */
- private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
-
- // create a set so we can have a unique list of appIds for which we call createEntityIndex
- Set<UUID> appIds = new HashSet<>();
-
- // loop through all indexRequests and add the appIds to the set
- indexOperationMessage.getIndexRequests().forEach(req -> {
- UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
- appIds.add(appId);
- });
-
- // loop through all deindexRequests and add the appIds to the set
- indexOperationMessage.getDeIndexRequests().forEach(req -> {
- UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
- appIds.add(appId);
- });
-
- // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
- appIds.forEach(appId -> {
- ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
- entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
- }
- );
- }
-
-
- @Override
- public long getQueueDepth() {
- return queue.getQueueDepth();
- }
-
- public Observable<IndexOperationMessage> handleEntityDelete(final QueueMessage message) {
-
- Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
-
- final AsyncEvent event = (AsyncEvent) message.getBody();
- Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
- Preconditions.checkArgument( event instanceof EntityDeleteEvent,
- String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
-
-
- final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
- final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
- final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
-
- if (logger.isDebugEnabled())
- logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
-
- final EventBuilderImpl.EntityDeleteResults
- entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
-
-
- // Delete the entities and remove from graph separately
- entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
-
- entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
-
- return entityDeleteResults.getIndexObservable();
- }
-
-
- public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
- Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
- Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
-
- final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
- ( InitializeApplicationIndexEvent ) event;
-
- final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
- final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
- index.initialize();
- }
-
- /**
- * Loop through and start the workers
- */
- public void start() {
- final int count = indexProcessorFig.getWorkerCount();
-
- for (int i = 0; i < count; i++) {
- startWorker();
- }
- }
-
-
- /**
- * Stop the workers
- */
- public void stop() {
- synchronized (mutex) {
- //stop consuming
-
- for (final Subscription subscription : subscriptions) {
- subscription.unsubscribe();
- }
- }
- }
-
-
- private void startWorker() {
- synchronized (mutex) {
-
- Observable<List<QueueMessage>> consumer =
- Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
- @Override
- public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
-
- //name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
-
- List<QueueMessage> drainList = null;
-
- do {
- try {
- drainList = take();
- //emit our list in it's entity to hand off to a worker pool
- subscriber.onNext(drainList);
-
- //take since we're in flight
- inFlight.addAndGet( drainList.size() );
- }
- catch ( Throwable t ) {
- final long sleepTime = indexProcessorFig.getFailureRetryTime();
-
- logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
-
- if ( drainList != null ) {
- inFlight.addAndGet( -1 * drainList.size() );
- }
-
-
- try {
- Thread.sleep( sleepTime );
- }
- catch ( InterruptedException ie ) {
- //swallow
- }
-
- indexErrorCounter.inc();
- }
- }
- while ( true );
- }
- } ) //this won't block our read loop, just reads and proceeds
- .flatMap( sqsMessages -> {
-
- //do this on a different schedule, and introduce concurrency with flatmap for faster processing
- return Observable.just( sqsMessages )
-
- .map( messages -> {
- if ( messages == null || messages.size() == 0 ) {
- return null;
- }
-
- try {
- List<IndexEventResult> indexEventResults =
- callEventHandlers( messages );
- List<QueueMessage> messagesToAck =
- submitToIndex( indexEventResults );
- if ( messagesToAck == null || messagesToAck.size() == 0 ) {
- logger.error(
- "No messages came back from the queue operation, should have seen {} messages",
- messages.size() );
- return messagesToAck;
- }
- if ( messagesToAck.size() < messages.size() ) {
- logger.error( "Missing messages from queue post operation",
- messages, messagesToAck );
- }
- //ack each message, but only if we didn't error.
- ack( messagesToAck );
- return messagesToAck;
- }
- catch ( Exception e ) {
- logger.error( "failed to ack messages to sqs", e );
- return null;
- //do not rethrow so we can process all of them
- }
- } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
- //end flatMap
- }, indexProcessorFig.getEventConcurrencyFactor() );
-
- //start in the background
-
- final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
-
- subscriptions.add(subscription);
- }
- }
-
- /**
- * Submit results to index and return the queue messages to be ack'd
- * @param indexEventResults
- * @return
- */
- private List<QueueMessage> submitToIndex( List<IndexEventResult> indexEventResults) {
- //if nothing came back then return null
- if(indexEventResults==null){
- return null;
- }
-
- final IndexOperationMessage combined = new IndexOperationMessage();
-
- //stream and filer the messages
- List<QueueMessage> messagesToAck = indexEventResults.stream()
- .map(indexEventResult -> {
- //collect into the index submission
- if (indexEventResult.getIndexOperationMessage().isPresent()) {
- combined.ingest(indexEventResult.getIndexOperationMessage().get());
- }
- return indexEventResult;
- })
- //filter out the ones that need to be ack'd
- .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
- .map(indexEventResult -> {
- //record the cycle time
- messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
- return indexEventResult;
- })
- //ack after successful completion of the operation.
- .map(result -> result.getQueueMessage().get())
- .collect(Collectors.toList());
-
- queueIndexOperationMessage( combined );
-
- return messagesToAck;
- }
-
- public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
- //change to id scope to avoid serialization issues
- offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
- }
-
- public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
-
- List batch = new ArrayList<EdgeScope>();
- for ( EdgeScope e : edges){
- //change to id scope to avoid serialization issues
- batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
- }
- offerBatch( batch );
- }
-
-
- public class IndexEventResult{
- private final Optional<QueueMessage> queueMessage;
- private final Optional<IndexOperationMessage> indexOperationMessage;
- private final long creationTime;
-
-
- public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage, long creationTime){
-
- this.queueMessage = queueMessage;
- this.indexOperationMessage = indexOperationMessage;
-
- this.creationTime = creationTime;
- }
-
-
- public Optional<QueueMessage> getQueueMessage() {
- return queueMessage;
- }
-
- public Optional<IndexOperationMessage> getIndexOperationMessage() {
- return indexOperationMessage;
- }
-
- public long getCreationTime() {
- return creationTime;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index dbf8996..288fb12 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -22,9 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -48,8 +46,9 @@ public interface AsyncEventService extends ReIndexAction {
* After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.
* @param applicationScope
* @param entity The entity to index. Should be fired when an entity is updated
+ * @param updatedAfter
*/
- void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity);
+ void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter);
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
new file mode 100644
index 0000000..e101761
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -0,0 +1,798 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.usergrid.persistence.index.impl.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+/**
+ * TODO, this whole class is becoming a nightmare. We need to remove all consume from this class and refactor it into the following manner.
+ *
+ * 1. Produce. Keep the code in the handle as is
+ * 2. Consume: Move the code into a refactored system
+ * 2.1 A central dispatcher
+ * 2.2 An interface that produces an observable of type BatchOperation. Any handler will be refactored into it's own
+ * impl that will then emit a stream of batch operations to perform
+ * 2.3 The central dispatcher will then subscribe to these events and merge them. Handing them off to a batch handler
+ * 2.4 The batch handler will roll up the operations into a batch size, and then queue them
+ * 2.5 The receive batch handler will execute the batch operations
+ *
+ * TODO determine how we error handle?
+ *
+ */
+@Singleton
+public class AsyncEventServiceImpl implements AsyncEventService {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncEventServiceImpl.class);
+
+ // SQS maximum receive messages is 10
+ public int MAX_TAKE = 10;
+ public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
+
+ private final QueueManager queue;
+ private final IndexProcessorFig indexProcessorFig;
+ private final QueueFig queueFig;
+ private final IndexProducer indexProducer;
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final EntityIndexFactory entityIndexFactory;
+ private final EventBuilder eventBuilder;
+ private final RxTaskScheduler rxTaskScheduler;
+
+ private final Timer readTimer;
+ private final Timer writeTimer;
+ private final Timer ackTimer;
+
+ /**
+ * This mutex is used to start/stop workers to ensure we're not concurrently modifying our subscriptions
+ */
+ private final Object mutex = new Object();
+
+ private final Counter indexErrorCounter;
+ private final AtomicLong counter = new AtomicLong();
+ private final AtomicLong inFlight = new AtomicLong();
+ private final Histogram messageCycle;
+ private final MapManager esMapPersistence;
+
+ //the actively running subscription
+ private List<Subscription> subscriptions = new ArrayList<>();
+
+
+ @Inject
+ public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory,
+ final IndexProcessorFig indexProcessorFig,
+ final IndexProducer indexProducer,
+ final MetricsFactory metricsFactory,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory,
+ final EventBuilder eventBuilder,
+ final MapManagerFactory mapManagerFactory,
+ final QueueFig queueFig,
+ @EventExecutionScheduler
+ final RxTaskScheduler rxTaskScheduler ) {
+ this.indexProducer = indexProducer;
+
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ this.eventBuilder = eventBuilder;
+
+ final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
+
+ this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
+ this.rxTaskScheduler = rxTaskScheduler;
+
+ QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ this.queue = queueManagerFactory.getQueueManager(queueScope);
+
+ this.indexProcessorFig = indexProcessorFig;
+ this.queueFig = queueFig;
+
+ this.writeTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.write");
+ this.readTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.read");
+ this.ackTimer = metricsFactory.getTimer(AsyncEventServiceImpl.class, "async_event.ack");
+ this.indexErrorCounter = metricsFactory.getCounter(AsyncEventServiceImpl.class, "async_event.error");
+ this.messageCycle = metricsFactory.getHistogram(AsyncEventServiceImpl.class, "async_event.message_cycle");
+
+
+ //wire up the gauge of inflight message
+ metricsFactory.addGauge(AsyncEventServiceImpl.class, "async-event.inflight", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return inFlight.longValue();
+ }
+ });
+
+ start();
+ }
+
+
+ /**
+ * Offer the EntityIdScope to SQS
+ */
+ private void offer(final Serializable operation) {
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessage( operation );
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to queue message", e);
+ } finally {
+ timer.stop();
+ }
+ }
+
+
+ private void offerTopic( final Serializable operation ) {
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessageToTopic( operation );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to queue message", e );
+ }
+ finally {
+ timer.stop();
+ }
+ }
+
+ private void offerBatch(final List operations){
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessages(operations);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to queue message", e);
+ } finally {
+ timer.stop();
+ }
+ }
+
+
+ /**
+ * Take message from SQS
+ */
+ private List<QueueMessage> take() {
+
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ return queue.getMessages(MAX_TAKE,
+ indexProcessorFig.getIndexQueueVisibilityTimeout(),
+ indexProcessorFig.getIndexQueueTimeout(),
+ AsyncEvent.class);
+ }
+ //stop our timer
+ finally {
+ timer.stop();
+ }
+ }
+
+
+
+ /**
+ * Ack message in SQS
+ */
+ public void ack(final List<QueueMessage> messages) {
+
+ final Timer.Context timer = this.ackTimer.time();
+
+ try{
+ queue.commitMessages( messages );
+
+ //decrement our in-flight counter
+ inFlight.decrementAndGet();
+
+ }catch(Exception e){
+ throw new RuntimeException("Unable to ack messages", e);
+ }finally {
+ timer.stop();
+ }
+
+
+ }
+
+ /**
+ * calls the event handlers and returns a result with information on whether it needs to be ack'd and whether it needs to be indexed
+ * @param messages
+ * @return
+ */
+ private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("callEventHandlers with {} message", messages.size());
+ }
+
+ Stream<IndexEventResult> indexEventResults = messages.parallelStream().map(message ->
+
+ {
+ AsyncEvent event = null;
+ try {
+ event = (AsyncEvent) message.getBody();
+
+ } catch (ClassCastException cce) {
+ logger.error("Failed to deserialize message body", cce);
+ return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+ }
+
+ if (event == null) {
+ logger.error("AsyncEvent type or event is null!");
+ return new IndexEventResult(Optional.absent(), System.currentTimeMillis());
+ }
+
+ final AsyncEvent thisEvent = event;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing {} event", event);
+ }
+
+ try {
+
+ // deletes are 2-part, actual IO to delete data, then queue up a de-index
+ if ( event instanceof EdgeDeleteEvent ) {
+
+ handleEdgeDelete( message );
+ }
+ // deletes are 2-part, actual IO to delete data, then queue up a de-index
+ else if ( event instanceof EntityDeleteEvent ) {
+
+ handleEntityDelete( message );
+ }
+ // application initialization has special logic, therefore a special event type
+ else if ( event instanceof InitializeApplicationIndexEvent ) {
+
+ handleInitializeApplicationIndex(event, message);
+ }
+ // this is the main event that pulls the index doc from map persistence and hands to the index producer
+ else if (event instanceof ElasticsearchIndexEvent) {
+
+ handleIndexOperation((ElasticsearchIndexEvent) event);
+
+ } else {
+
+ throw new Exception("Unknown EventType for message: "+ message.getStringBody());
+ }
+
+
+ //return type that can be indexed and ack'd later
+ return new IndexEventResult(Optional.of(message), thisEvent.getCreationTime());
+
+ } catch (IndexDocNotFoundException e){
+
+ // this exception is throw when we wait before trying quorum read on map persistence.
+ // return empty event result so the event's message doesn't get ack'd
+ logger.info(e.getMessage());
+ return new IndexEventResult(Optional.absent(), event.getCreationTime());
+
+ } catch (Exception e) {
+
+ // if the event fails to process, log the message and return empty event result so it doesn't get ack'd
+ logger.error("Failed to process message: {} {}", message.getMessageId(), message.getStringBody(), e);
+ return new IndexEventResult(Optional.absent(), event.getCreationTime());
+ }
+ });
+
+
+ return indexEventResults.collect(Collectors.toList());
+ }
+
+ @Override
+ public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
+ IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
+ applicationScope);
+ offerTopic( new InitializeApplicationIndexEvent( queueFig.getPrimaryRegion(),
+ new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
+ }
+
+
+ @Override
+ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
+ final Entity entity, long updatedAfter) {
+
+
+ final EntityIndexOperation entityIndexOperation =
+ new EntityIndexOperation( applicationScope, entity.getId(), updatedAfter);
+
+ final IndexOperationMessage indexMessage =
+ eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
+
+ queueIndexOperationMessage( indexMessage );
+
+ }
+
+
+ @Override
+ public void queueNewEdge(final ApplicationScope applicationScope,
+ final Entity entity,
+ final Edge newEdge) {
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+ final IndexOperationMessage indexMessage = ecm.load( entity.getId() )
+ .flatMap( loadedEntity -> eventBuilder.buildNewEdge(applicationScope, entity, newEdge) )
+ .toBlocking().lastOrDefault(null);
+
+ queueIndexOperationMessage( indexMessage );
+
+ }
+
+
+ @Override
+ public void queueDeleteEdge(final ApplicationScope applicationScope,
+ final Edge edge) {
+
+ offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
+ }
+
+ public void handleEdgeDelete(final QueueMessage message) {
+
+ Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" );
+ Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass()));
+
+
+ final EdgeDeleteEvent edgeDeleteEvent = ( EdgeDeleteEvent ) event;
+
+ final ApplicationScope applicationScope = edgeDeleteEvent.getApplicationScope();
+ final Edge edge = edgeDeleteEvent.getEdge();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
+ }
+
+ IndexOperationMessage indexMessage =
+ eventBuilder.buildDeleteEdge(applicationScope, edge).toBlocking().lastOrDefault(null);
+
+ queueIndexOperationMessage(indexMessage);
+
+ }
+
+
+
+ /**
+ * Queue up an indexOperationMessage for multi region execution
+ * @param indexOperationMessage
+ */
+ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+ // don't try to produce something with nothing
+ if(indexOperationMessage == null || indexOperationMessage.isEmpty()){
+ return;
+ }
+
+ final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+ final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+ final int expirationTimeInSeconds =
+ ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() );
+
+ //write to the map in ES
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds );
+
+
+
+ //now queue up the index message
+
+ final ElasticsearchIndexEvent elasticsearchIndexEvent =
+ new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId );
+
+ //send to the topic so all regions index the batch
+
+ offerTopic( elasticsearchIndexEvent );
+ }
+
+ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+ final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+ Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+ //load the entity
+
+ final String message = esMapPersistence.getString( messageId.toString() );
+
+ final IndexOperationMessage indexOperationMessage;
+
+ if(message == null) {
+
+ if ( System.currentTimeMillis() > elasticsearchIndexEvent.getCreationTime() + queueFig.getLocalQuorumTimeout() ) {
+
+ logger.warn("Received message with id {} to process, unable to find it, reading with higher consistency level",
+ messageId);
+
+ final String highConsistency = esMapPersistence.getStringHighConsistency(messageId.toString());
+
+ if (highConsistency == null) {
+ logger.error("Unable to find the ES batch with id {} to process at a higher consistency level",
+ messageId);
+
+ throw new RuntimeException("Unable to find the ES batch to process with message id " + messageId);
+ }
+
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString(highConsistency, IndexOperationMessage.class);
+
+ } else{
+
+ throw new IndexDocNotFoundException(elasticsearchIndexEvent.getIndexBatchId());
+
+ }
+
+ } else{
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
+ }
+
+ initializeEntityIndexes(indexOperationMessage);
+
+ //NOTE that we intentionally do NOT delete from the map. We can't know when all regions have consumed the message
+ //so we'll let compaction on column expiration handle deletion
+
+ //read the value from the string
+
+ Preconditions.checkNotNull( indexOperationMessage, "indexOperationMessage cannot be null" );
+ Preconditions.checkArgument( !indexOperationMessage.isEmpty() , "queued indexOperationMessage messages should not be empty" );
+
+
+ //now execute it
+ indexProducer.put(indexOperationMessage).toBlocking().last();
+
+ }
+
+ /**
+ * this method will call initialize for each message, since we are caching the entity indexes,
+ * we don't worry about aggregating by app id
+ * @param indexOperationMessage
+ */
+ private void initializeEntityIndexes(final IndexOperationMessage indexOperationMessage) {
+
+ // create a set so we can have a unique list of appIds for which we call createEntityIndex
+ Set<UUID> appIds = new HashSet<>();
+
+ // loop through all indexRequests and add the appIds to the set
+ indexOperationMessage.getIndexRequests().forEach(req -> {
+ UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ appIds.add(appId);
+ });
+
+ // loop through all deindexRequests and add the appIds to the set
+ indexOperationMessage.getDeIndexRequests().forEach(req -> {
+ UUID appId = IndexingUtils.getApplicationIdFromIndexDocId(req.documentId);
+ appIds.add(appId);
+ });
+
+ // for each of the appIds in the unique set, call create entity index to ensure the aliases are created
+ appIds.forEach(appId -> {
+ ApplicationScope appScope = CpNamingUtils.getApplicationScope(appId);
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(appScope));
+ }
+ );
+ }
+
+
+ @Override
+ public long getQueueDepth() {
+ return queue.getQueueDepth();
+ }
+
+ @Override
+ public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+
+ offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
+ }
+
+ public void handleEntityDelete(final QueueMessage message) {
+
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+ Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" );
+ Preconditions.checkArgument( event instanceof EntityDeleteEvent,
+ String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) );
+
+
+ final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event;
+ final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope();
+ final Id entityId = entityDeleteEvent.getEntityIdScope().getId();
+
+ if (logger.isDebugEnabled())
+ logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+
+ final EventBuilderImpl.EntityDeleteResults
+ entityDeleteResults = eventBuilder.buildEntityDelete( applicationScope, entityId );
+
+
+ // Delete the entities and remove from graph separately
+ entityDeleteResults.getEntitiesDeleted().toBlocking().lastOrDefault(null);
+
+ entityDeleteResults.getCompactedNode().toBlocking().lastOrDefault(null);
+
+ IndexOperationMessage indexMessage = entityDeleteResults.getIndexObservable().toBlocking().lastOrDefault(null);
+
+ queueIndexOperationMessage(indexMessage);
+
+ }
+
+
+ public void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
+ Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
+
+ final InitializeApplicationIndexEvent initializeApplicationIndexEvent =
+ ( InitializeApplicationIndexEvent ) event;
+
+ final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy();
+ final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy );
+ index.initialize();
+ }
+
+ /**
+ * Loop through and start the workers
+ */
+ public void start() {
+ final int count = indexProcessorFig.getWorkerCount();
+
+ for (int i = 0; i < count; i++) {
+ startWorker();
+ }
+ }
+
+
+ /**
+ * Stop the workers
+ */
+ public void stop() {
+ synchronized (mutex) {
+ //stop consuming
+
+ for (final Subscription subscription : subscriptions) {
+ subscription.unsubscribe();
+ }
+ }
+ }
+
+
+ private void startWorker() {
+ synchronized (mutex) {
+
+ Observable<List<QueueMessage>> consumer =
+ Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
+ @Override
+ public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
+
+ //name our thread so it's easy to see
+ Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
+
+ List<QueueMessage> drainList = null;
+
+ do {
+ try {
+ drainList = take();
+ //emit our list in it's entity to hand off to a worker pool
+ subscriber.onNext(drainList);
+
+ //take since we're in flight
+ inFlight.addAndGet( drainList.size() );
+ }
+ catch ( Throwable t ) {
+ final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+ logger.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
+
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
+ }
+
+
+ try {
+ Thread.sleep( sleepTime );
+ }
+ catch ( InterruptedException ie ) {
+ //swallow
+ }
+
+ indexErrorCounter.inc();
+ }
+ }
+ while ( true );
+ }
+ } ) //this won't block our read loop, just reads and proceeds
+ .flatMap( sqsMessages -> {
+
+ //do this on a different schedule, and introduce concurrency with flatmap for faster processing
+ return Observable.just( sqsMessages )
+
+ .map( messages -> {
+ if ( messages == null || messages.size() == 0 ) {
+ return null;
+ }
+
+ try {
+ List<IndexEventResult> indexEventResults = callEventHandlers( messages );
+ List<QueueMessage> messagesToAck = ackMessages( indexEventResults );
+
+ if ( messagesToAck == null || messagesToAck.size() == 0 ) {
+ logger.error(
+ "No messages came back from the queue operation, should have seen {} messages",
+ messages.size() );
+ return messagesToAck;
+ }
+
+ if ( messagesToAck.size() < messages.size() ) {
+ logger.error( "Missing messages from queue post operation",
+ messages, messagesToAck );
+ }
+ //ack each message, but only if we didn't error.
+ ack( messagesToAck );
+ return messagesToAck;
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to ack messages to sqs", e );
+ return null;
+ //do not rethrow so we can process all of them
+ }
+ } ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
+ //end flatMap
+ }, indexProcessorFig.getEventConcurrencyFactor() );
+
+ //start in the background
+
+ final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();
+
+ subscriptions.add(subscription);
+ }
+ }
+
+ /**
+ * Submit results to index and return the queue messages to be ack'd
+ * @param indexEventResults
+ * @return
+ */
+ private List<QueueMessage> ackMessages(List<IndexEventResult> indexEventResults) {
+ //if nothing came back then return null
+ if(indexEventResults==null){
+ return null;
+ }
+
+ // stream the messages to record the cycle time
+ return indexEventResults.stream()
+ .map(indexEventResult -> {
+ //record the cycle time
+ messageCycle.update(System.currentTimeMillis() - indexEventResult.getCreationTime());
+ return indexEventResult;
+ })
+ // filter out messages that are not present, they were not processed and put into the results
+ .filter( result -> result.getQueueMessage().isPresent() )
+ .map(result -> result.getQueueMessage().get())
+ // collect
+ .collect(Collectors.toList());
+ }
+
+ public void index(final ApplicationScope applicationScope, final Id id, final long updatedSince) {
+ //change to id scope to avoid serialization issues
+ offer( new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, id ), updatedSince ) );
+ }
+
+ public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
+
+ List batch = new ArrayList<EdgeScope>();
+ for ( EdgeScope e : edges){
+ //change to id scope to avoid serialization issues
+ batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
+ }
+ offerBatch( batch );
+ }
+
+
+ public class IndexEventResult{
+ private final Optional<QueueMessage> queueMessage;
+ private final long creationTime;
+
+ public IndexEventResult(Optional<QueueMessage> queueMessage, long creationTime){
+
+ this.queueMessage = queueMessage;
+ this.creationTime = creationTime;
+ }
+
+
+ public Optional<QueueMessage> getQueueMessage() {
+ return queueMessage;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 96da2df..abd4ce1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -104,14 +104,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
switch (impl) {
case LOCAL:
- AmazonAsyncEventService eventService = new AmazonAsyncEventService(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
+ AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
eventService.MAX_TAKE = 1000;
return eventService;
case SQS:
throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
case SNS:
- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
+ return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index 480756f..a47ec77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -39,14 +39,6 @@ import rx.Observable;
public interface EventBuilder {
/**
- * Return the cold observable of entity index update operations
- * @param applicationScope
- * @param entity
- * @return
- */
- Observable<IndexOperationMessage> buildEntityIndexUpdate( ApplicationScope applicationScope, Entity entity );
-
- /**
* Return the cold observable of the new edge operation
* @param applicationScope
* @param entity
@@ -69,7 +61,9 @@ public interface EventBuilder {
* @param entityId
* @return
*/
- EventBuilderImpl.EntityDeleteResults buildEntityDelete( ApplicationScope applicationScope, Id entityId );
+ EntityDeleteResults buildEntityDelete(ApplicationScope applicationScope, Id entityId );
+
+
/**
* Re-index an entity in the scope provided
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index 4e476db..2edc668 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -73,19 +73,6 @@ public class EventBuilderImpl implements EventBuilder {
}
- @Override
- public Observable<IndexOperationMessage> buildEntityIndexUpdate( final ApplicationScope applicationScope,
- final Entity entity ) {
- //process the entity immediately
- //only process the same version, otherwise ignore
-
- if (logger.isDebugEnabled()) {
- logger.debug("Indexing in app scope {} entity {}", entity, applicationScope);
- }
-
- return indexService.indexEntity( applicationScope, entity );
- }
-
@Override
public Observable<IndexOperationMessage> buildNewEdge( final ApplicationScope applicationScope, final Entity entity,
@@ -118,7 +105,7 @@ public class EventBuilderImpl implements EventBuilder {
//it'll need to be pushed up higher so we can do the marking that isn't async or does it not matter?
@Override
- public EntityDeleteResults buildEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
+ public EntityDeleteResults buildEntityDelete(final ApplicationScope applicationScope, final Id entityId ) {
if (logger.isDebugEnabled()) {
logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
new file mode 100644
index 0000000..c0e022f
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/IndexDocNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.util.UUID;
+
+public class IndexDocNotFoundException extends RuntimeException {
+
+ final UUID batchId;
+
+ public IndexDocNotFoundException(final UUID batchId){
+
+ super("Index batch ID "+batchId.toString()+" not found in map persistence");
+ this.batchId = batchId;
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index 68c398f..7512c90 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.index;
import java.util.Iterator;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.utils.UUIDUtils;
@@ -104,7 +105,8 @@ public class IndexServiceImpl implements IndexService {
//do our observable for batching
//try to send a whole batch if we can
- final Observable<IndexOperationMessage> batches = sourceEdgesToIndex.buffer( indexFig.getIndexBatchSize() )
+ final Observable<IndexOperationMessage> batches = sourceEdgesToIndex
+ .buffer( indexFig.getIndexBatchSize(), 1000, TimeUnit.MILLISECONDS )
//map into batches based on our buffer size
.flatMap( buffer -> Observable.from( buffer )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b4634dc4/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
index bf444b5..d47e96c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/CandidateEntityFilter.java
@@ -230,10 +230,16 @@ public class CandidateEntityFilter extends AbstractFilter<FilterResult<Candidate
//entity is newer than ES version, could be an update or the entity is marked as deleted
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ||
+ !entity.getEntity().isPresent() ||
+ entity.getStatus() == MvccEntity.Status.DELETED ) {
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ // when updating entities, we don't delete previous versions from ES so this action is expected
+ if(logger.isDebugEnabled()){
+ logger.debug( "Deindexing stale entity on edge {} for entityId {} and version {}",
searchEdge, entityId, entityVersion);
+ }
+
batch.deindex( searchEdge, entityId, candidateVersion );
return;
}