You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/28 14:53:22 UTC
[17/50] [abbrv] incubator-usergrid git commit: Renamed
SQSAsyncEventService to AmazonAsyncEventService,
implemented functions for Async indexing
Renamed SQSAsyncEventService to AmazonAsyncEventService, implemented functions for Async indexing
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/be0e43c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/be0e43c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/be0e43c1
Branch: refs/heads/USERGRID-669
Commit: be0e43c1975045441947821f98eab8ce88382f93
Parents: 6d0ebd0
Author: Jeff West <jw...@apigee.com>
Authored: Tue May 26 09:01:58 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Tue May 26 09:01:58 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 425 +++++++++++++++++++
.../asyncevents/SQSAsyncEventService.java | 356 ----------------
2 files changed, 425 insertions(+), 356 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/be0e43c1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
new file mode 100644
index 0000000..2cb6b3b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.asyncevents;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import org.apache.usergrid.corepersistence.CpEntityManager;
+import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
+import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
+import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+import rx.Observable;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.schedulers.Schedulers;
+
+
+@Singleton
+public class AmazonAsyncEventService implements AsyncEventService {
+
+
+ private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
+ private static final int MAX_TAKE = 10;
+ private static final String QUEUE_NAME = "es_queue";
+
+ private final QueueManager queue;
+ private final IndexProcessorFig indexProcessorFig;
+ private final IndexService indexService;
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final RxTaskScheduler rxTaskScheduler;
+
+ private final Timer readTimer;
+ private final Timer writeTimer;
+ private final Timer messageProcessingTimer;
+
+ private final Object mutex = new Object();
+
+ private final Counter indexErrorCounter;
+ private final AtomicLong counter = new AtomicLong();
+ private final AtomicLong inFlight = new AtomicLong();
+
+ //the actively running subscription
+ private List<Subscription> subscriptions = new ArrayList<>();
+
+
+ @Inject
+ public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
+ final IndexProcessorFig indexProcessorFig,
+ final MetricsFactory metricsFactory,
+ final IndexService indexService,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final RxTaskScheduler rxTaskScheduler) {
+
+ this.indexService = indexService;
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.rxTaskScheduler = rxTaskScheduler;
+
+ final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+ this.queue = queueManagerFactory.getQueueManager(queueScope);
+ this.indexProcessorFig = indexProcessorFig;
+
+ this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "write");
+ this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "read");
+ this.messageProcessingTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "message.processing");
+ this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "error");
+
+
+ //wire up the gauge of inflight message
+ metricsFactory.addGauge(AmazonAsyncEventService.class, "inflight.meter", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return inFlight.longValue();
+ }
+ });
+
+ start();
+ }
+
+
+ /**
+ * Offer the EntityIdScope to SQS
+ */
+ private void offer(final Object operation) {
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessage(operation);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to queue message", e);
+ } finally {
+ timer.stop();
+ }
+ }
+
+
+ /**
+ * Take message from SQS
+ */
+ public List<QueueMessage> take() {
+
+ //SQS doesn't support more than 10
+ final Timer.Context timer = this.readTimer.time();
+
+ try {
+ return queue.getMessages(MAX_TAKE,
+ indexProcessorFig.getIndexQueueVisibilityTimeout(),
+ indexProcessorFig.getIndexQueueTimeout(),
+ AsyncEvent.class);
+ }
+ //stop our timer
+ finally {
+ timer.stop();
+ }
+ }
+
+
+ /**
+ * Ack message in SQS
+ */
+ public void ack(final List<QueueMessage> messages) {
+
+ /**
+ * No op
+ */
+ if (messages.size() == 0) {
+ return;
+ }
+
+ queue.commitMessages(messages);
+ }
+
+ /**
+ * Ack message in SQS
+ */
+ public void ack(final QueueMessage message) {
+
+ queue.commitMessage(message);
+ }
+
+ private void handleMessages(final List<QueueMessage> messages) {
+
+ if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size());
+
+ for (QueueMessage message : messages) {
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+
+ if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType());
+
+ switch (event.getEventType()) {
+
+ case EDGE_DELETE:
+ handleEdgeDelete(message);
+ break;
+
+ case EDGE_INDEX:
+ handleEdgeIndex(message);
+ break;
+
+ case ENTITY_DELETE:
+ handleEntityDelete(message);
+ break;
+
+ case ENTITY_INDEX:
+ handleEntityIndexUpdate(message);
+ break;
+
+ default:
+ logger.error("Unknown EventType: {}", event.getEventType());
+
+ }
+ }
+ }
+
+
+ @Override
+ public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
+ final Entity entity) {
+
+ offer(new EntityIndexEvent(new EntityIdScope(applicationScope, entity.getId())));
+ }
+
+
+ public void handleEntityIndexUpdate(final QueueMessage message) {
+
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate");
+ Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType()));
+
+ //process the entity immediately
+ //only process the same version, otherwise ignore
+ final EntityIdScope entityIdScope = event.getEntityIdScope();
+ final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+
+ ecm.load(entityIdScope.getId())
+ .first()
+ .flatMap(entity -> indexService.indexEntity(applicationScope, entity))
+ .doOnNext(ignore -> ack(message)).subscribe();
+ }
+
+
+ @Override
+ public void queueNewEdge(final ApplicationScope applicationScope,
+ final Entity entity,
+ final Edge newEdge) {
+
+ EdgeIndexEvent operation = new EdgeIndexEvent(applicationScope, entity.getId(), newEdge);
+
+ offer(operation);
+ }
+
+ public void handleEdgeIndex(final QueueMessage message) {
+
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeIndex");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex");
+ Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType()));
+
+ final ApplicationScope applicationScope = event.getApplicationScope();
+ final Edge edge = event.getEdge();
+
+ final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager(applicationScope);
+
+ ecm.load(event.getEntityId())
+ .flatMap(entity -> indexService.indexEdge(applicationScope, entity, edge))
+ .doOnNext(ignore -> ack(message)).subscribe();
+ }
+
+ @Override
+ public void queueDeleteEdge(final ApplicationScope applicationScope,
+ final Edge edge) {
+
+ offer(new EdgeDeleteEvent(applicationScope, edge));
+ }
+
+ public void handleEdgeDelete(final QueueMessage message) {
+
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEdgeDelete");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete");
+ Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType()));
+
+ final ApplicationScope applicationScope = event.getApplicationScope();
+ final Edge edge = event.getEdge();
+
+ if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge);
+
+ indexService.deleteIndexEdge(applicationScope, edge)
+ .doOnNext(ignore -> ack(message)).subscribe();
+ }
+
+
+ @Override
+ public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
+
+ offer(new EntityDeleteEvent(new EntityIdScope(applicationScope, entityId)));
+ }
+
+ public void handleEntityDelete(final QueueMessage message) {
+
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete");
+ Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, String.format("Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType()));
+
+ final ApplicationScope applicationScope = event.getApplicationScope();
+ final Id entityId = event.getEntityId();
+
+ if (logger.isDebugEnabled())
+ logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId);
+
+ ack(message);
+
+ indexService.deleteEntityIndexes(applicationScope, entityId)
+ .doOnNext(ignore -> ack(message)).subscribe();
+ }
+
+
+ /**
+ * Loop through and start the workers
+ */
+ public void start() {
+ final int count = indexProcessorFig.getWorkerCount();
+
+ for (int i = 0; i < count; i++) {
+ startWorker();
+ }
+ }
+
+
+ /**
+ * Stop the workers
+ */
+ public void stop() {
+ synchronized (mutex) {
+ //stop consuming
+
+ for (final Subscription subscription : subscriptions) {
+ subscription.unsubscribe();
+ }
+ }
+ }
+
+
+ private void startWorker() {
+ synchronized (mutex) {
+
+ Observable<List<QueueMessage>> consumer =
+ Observable.create(new Observable.OnSubscribe<List<QueueMessage>>() {
+ @Override
+ public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
+
+ //name our thread so it's easy to see
+ Thread.currentThread().setName("QueueConsumer_" + counter.incrementAndGet());
+
+ List<QueueMessage> drainList = null;
+
+ do {
+ Timer.Context timer = readTimer.time();
+
+ try {
+ drainList = take();
+
+ //emit our list in it's entity to hand off to a worker pool
+ subscriber.onNext(drainList);
+
+ //take since we're in flight
+ inFlight.addAndGet(drainList.size());
+ } catch (Throwable t) {
+ final long sleepTime = indexProcessorFig.getFailureRetryTime();
+
+ logger.error("Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t);
+
+ if (drainList != null) {
+ inFlight.addAndGet(-1 * drainList.size());
+ }
+
+
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ie) {
+ //swallow
+ }
+
+ indexErrorCounter.inc();
+ } finally {
+ timer.stop();
+ }
+ }
+ while (true);
+ }
+ })
+ //this won't block our read loop, just reads and proceeds
+ .doOnNext(this::handleMessages).subscribeOn(Schedulers.newThread());
+
+ //start in the background
+
+ final Subscription subscription = consumer.subscribe();
+
+ subscriptions.add(subscription);
+ }
+ }
+
+ @Override
+ public void index(final EntityIndexOperation entityIdScope) {
+
+ //todo need this?
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/be0e43c1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
deleted file mode 100644
index 1dbfd4e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.asyncevents;
-
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.IndexService;
-import org.apache.usergrid.exception.NotImplementedException;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Timer;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-
-import rx.Observable;
-import rx.Subscriber;
-import rx.Subscription;
-import rx.schedulers.Schedulers;
-
-
-@Singleton
-public class SQSAsyncEventService implements AsyncEventService {
-
-
- private static final Logger log = LoggerFactory.getLogger( SQSAsyncEventService.class );
-
- /**
- * Set our TTL to 1 month. This is high, but in the event of a bug, we want these entries to get removed
- */
- public static final int TTL = 60 * 60 * 24 * 30;
-
-
- private static final int MAX_TAKE = 10;
-
- private static final String QUEUE_NAME = "es_queue";
-
- private final QueueManager queue;
- private final IndexProcessorFig indexProcessorFig;
- private final IndexService indexService;
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final RxTaskScheduler rxTaskScheduler;
-
- private final Timer readTimer;
- private final Timer writeTimer;
- private final Timer messageProcessingTimer;
-
- private final Object mutex = new Object();
-
-
- private final Counter indexErrorCounter;
- private final AtomicLong counter = new AtomicLong();
- private final AtomicLong inFlight = new AtomicLong();
-
- //the actively running subscription
- private List<Subscription> subscriptions = new ArrayList<>();
-
-
- @Inject
- public SQSAsyncEventService( final QueueManagerFactory queueManagerFactory,
- final IndexProcessorFig indexProcessorFig, final MetricsFactory metricsFactory,
- final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final RxTaskScheduler rxTaskScheduler ) {
-
- this.indexService = indexService;
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.rxTaskScheduler = rxTaskScheduler;
-
- final QueueScope queueScope = new QueueScopeImpl( QUEUE_NAME );
- this.queue = queueManagerFactory.getQueueManager( queueScope );
- this.indexProcessorFig = indexProcessorFig;
-
- this.writeTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "write" );
- this.readTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "read" );
- this.messageProcessingTimer = metricsFactory.getTimer( SQSAsyncEventService.class, "message.processing" );
- this.indexErrorCounter = metricsFactory.getCounter( SQSAsyncEventService.class, "error" );
-
-
- //wire up the gauge of inflight messages
- metricsFactory.addGauge( SQSAsyncEventService.class, "inflight.meter", new Gauge<Long>() {
- @Override
- public Long getValue() {
- return inFlight.longValue();
- }
- } );
-
- start();
- }
-
-
- /**
- * Offer the EntityIdScope to SQS
- */
- private void offer( final EntityIdScope operation ) {
- final Timer.Context timer = this.writeTimer.time();
-
- try {
- //signal to SQS
- this.queue.sendMessage( operation );
- }
- catch ( IOException e ) {
- throw new RuntimeException( "Unable to queue message", e );
- }
- finally {
- timer.stop();
- }
- }
-
-
- /**
- * Take messages from SQS
- */
- public List<QueueMessage> take() {
-
- //SQS doesn't support more than 10
- final Timer.Context timer = this.readTimer.time();
-
- try {
- return queue.getMessages( MAX_TAKE, indexProcessorFig.getIndexQueueTimeout(), indexProcessorFig.getIndexQueueTimeout(),
- EntityIdScope.class );
- }
- //stop our timer
- finally {
- timer.stop();
- }
- }
-
-
- /**
- * Ack messages in SQS
- */
- public void ack( final List<QueueMessage> messages ) {
-
- /**
- * No op
- */
- if ( messages.size() == 0 ) {
- return;
- }
-
- queue.commitMessages( messages );
- }
-
-
-// @Override
- public void index( final EntityIdScope entityIdScope ) {
- //queue the re-inex operation
- offer( entityIdScope );
- }
-
-
- @Override
- public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
-
- //create our scope
- final EntityIdScope entityIdScope = new EntityIdScope( applicationScope, entity.getId() );
-
- //send it to SQS for indexing
- index( entityIdScope );
- }
-
-
- @Override
- public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
- throw new NotImplementedException( "Implement me" );
- }
-
-
- @Override
- public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
- throw new NotImplementedException( "Implement me" );
- }
-
-
- @Override
- public void queueEntityDelete( final ApplicationScope applicationScope, final Id entityId ) {
- throw new NotImplementedException( "Implement me" );
- }
-
-
- /**
- * Index an entity and return an observable of the queue message on success
- */
- private Observable<IndexOperationMessage> indexEntity( final QueueMessage queueMessage ) {
- final EntityIdScope entityIdScope = ( EntityIdScope ) queueMessage.getBody();
- final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- //run the index operation from the entity
- return entityCollectionManager.load( entityIdScope.getId() )
- //invoke the indexing and take the last value
- .flatMap( entity -> indexService.indexEntity( applicationScope, entity ).last() );
- }
-
-
- /**
- * Do the indexing for a list of queue messages
- */
- private void doIndex( final List<QueueMessage> queueMessages ) {
- //create parallel observables to process all 10 messages
- final Observable<Long> observable = Observable.from( queueMessages ).flatMap( ( QueueMessage queueMessage ) -> {
- return indexEntity( queueMessage ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() );
- }, MAX_TAKE ).countLong()
-
- //remove our in flight
- .doOnNext( count -> inFlight.addAndGet( -1 * count ) )
-
- //do on completed ack messages. Assumes no expections were thrown
- .doOnCompleted( () -> ack( queueMessages ) );
-
- //wrap with our timer and fire
- ObservableTimer.time( observable, messageProcessingTimer ).subscribe();
- }
-
-
- /**
- * Loop throught and start the workers
- */
- public void start() {
- final int count = indexProcessorFig.getWorkerCount();
-
- for ( int i = 0; i < count; i++ ) {
- startWorker();
- }
- }
-
-
- /**
- * Stop the workers
- */
- public void stop() {
- synchronized ( mutex ) {
- //stop consuming
-
- for ( final Subscription subscription : subscriptions ) {
- subscription.unsubscribe();
- }
- }
- }
-
-
- private void startWorker() {
- synchronized ( mutex ) {
-
- Observable<List<QueueMessage>> consumer =
- Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
- @Override
- public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
-
- //name our thread so it's easy to see
- Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
-
- List<QueueMessage> drainList = null;
-
- do {
- Timer.Context timer = readTimer.time();
-
- try {
- drainList = take();
-
- //emit our list in it's entirity to hand off to a worker pool
- subscriber.onNext( drainList );
-
- //take since we're in flight
- inFlight.addAndGet( drainList.size() );
- }
-
- catch ( Throwable t ) {
- final long sleepTime = indexProcessorFig.getFailureRetryTime();
-
- log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
-
- if ( drainList != null ) {
- inFlight.addAndGet( -1 * drainList.size() );
- }
-
-
- try {
- Thread.sleep( sleepTime );
- }
- catch ( InterruptedException ie ) {
- //swallow
- }
-
- indexErrorCounter.inc();
- }
-
- finally{
- timer.stop();
- }
- }
- while ( true );
- }
- } )
- //this won't block our read loop, just reads and proceeds
- .doOnNext( messages -> doIndex( messages ) ).subscribeOn( Schedulers.newThread() );
-
- //start in the background
-
- final Subscription subscription = consumer.subscribe();
-
- subscriptions.add( subscription );
- }
- }
-
-
- @Override
- public void index( final EntityIndexOperation entityIdScope ) {
- throw new NotImplementedException( "Implement me" );
- }
-}