You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/15 16:18:02 UTC
[12/50] [abbrv] usergrid git commit: commit stash
commit stash
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7c5a864d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7c5a864d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7c5a864d
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 7c5a864dd1ce22ada5456ced1a6c10f9a5533b1f
Parents: 7dceb56
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 6 14:34:33 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 6 14:34:33 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 71 ++++++++++++++------
.../index/impl/EsIndexProducerImpl.java | 6 +-
2 files changed, 53 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7c5a864d/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 14d37b5..1bc70cd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Optional;
@@ -226,6 +227,28 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
+ /**
+ * Ack message in SQS
+ */
+ public void ack(final List<QueueMessage> messages) {
+
+ final Timer.Context timer = this.ackTimer.time();
+
+ try{
+ queue.commitMessages(messages);
+
+ //decrement our in-flight counter
+ inFlight.decrementAndGet();
+
+ }catch(Exception e){
+ throw new RuntimeException("Unable to ack messages", e);
+ }finally {
+ timer.stop();
+ }
+
+
+ }
+
private Observable<QueueMessage> handleMessages( final List<QueueMessage> messages ) {
if (logger.isDebugEnabled()) {
@@ -243,7 +266,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+ return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
}
try {
//merge each operation to a master observable;
@@ -258,23 +281,24 @@ public class AmazonAsyncEventService implements AsyncEventService {
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
handleInitializeApplicationIndex(message);
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+ return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
} else {
- logger.error("Unknown EventType: {}", event);
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+ logger.error("Unknown EventType: {}", event);//TODO: print json instead
+ return Observable.just(new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent()));
}
} catch (Exception e) {
logger.error("Failed to index entity", e, message);
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+ return Observable.just(new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent()));
} finally {
messageCycle.update(System.currentTimeMillis() - event.getCreationTime());
}
});
+ //filter for success, send to the index(optional), ack
return masterObservable
//remove unsuccessful
- .filter(indexEventResult -> indexEventResult.shouldProcess())
+ .filter(indexEventResult -> indexEventResult.getQueueMessage().isPresent())
//take the max
.buffer( MAX_TAKE )
//map them to index results and return them
@@ -290,8 +314,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
//ack after successful completion of the operation.
return indexProducer.put(combined)
- .flatMap(operationResult -> Observable.from(indexEventResults))
- .map(result -> result.getQueueMessage());
+ .flatMap(indexOperationMessage -> Observable.from(indexEventResults))
+ .map(result -> result.getQueueMessage().get());
});
@@ -303,10 +327,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
){
try{
return operation.call(queueMessage)
- .map(indexOperationMessage -> new IndexEventResult(queueMessage, Optional.fromNullable(indexOperationMessage), true));
+ .map(indexOperationMessage -> new IndexEventResult(Optional.fromNullable(queueMessage), Optional.fromNullable(indexOperationMessage)));
}catch (Exception e){
logger.error("failed to run index",e);
- return Observable.just( new IndexEventResult(queueMessage, Optional.<IndexOperationMessage>absent(),false));
+ return Observable.just( new IndexEventResult(Optional.fromNullable(queueMessage), Optional.<IndexOperationMessage>absent()));
}
}
@@ -543,10 +567,17 @@ public class AmazonAsyncEventService implements AsyncEventService {
//this won't block our read loop, just reads and proceeds
.flatMap(messages ->
handleMessages(messages)
- .doOnNext(message -> {
- //ack each message, but only if we didn't error.
- ack(message);
+ .buffer(MAX_TAKE)
+ .doOnNext(messagesToAck -> {
+ try {
+ //ack each message, but only if we didn't error.
+ ack(messagesToAck);
+ } catch (Exception e) {
+ logger.error("failed to ack messages to sqs", messagesToAck.get(0).getMessageId(), e);
+ //do not rethrow so we can process all of them
+ }
})
+ .flatMap(messagesToAck -> Observable.from(messagesToAck))
);
//start in the background
@@ -574,23 +605,19 @@ public class AmazonAsyncEventService implements AsyncEventService {
public class IndexEventResult{
- private final QueueMessage queueMessage;
+ private final Optional<QueueMessage> queueMessage;
private final Optional<IndexOperationMessage> indexOperationMessage;
- private final boolean shouldProcess;
- public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean shouldProcess){
+
+ public IndexEventResult(Optional<QueueMessage> queueMessage, Optional<IndexOperationMessage> indexOperationMessage ){
this.queueMessage = queueMessage;
this.indexOperationMessage = indexOperationMessage;
- this.shouldProcess = shouldProcess;
- }
- public QueueMessage getQueueMessage() {
- return queueMessage;
}
- public boolean shouldProcess() {
- return shouldProcess;
+ public Optional<QueueMessage> getQueueMessage() {
+ return queueMessage;
}
public Optional<IndexOperationMessage> getIndexOperationMessage() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/7c5a864d/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 409c2bc..e778948 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -115,6 +115,7 @@ public class EsIndexProducerImpl implements IndexProducer {
final Observable<IndexOperation> index = Observable.from(batch.getIndexRequests());
final Observable<DeIndexOperation> deIndex = Observable.from(batch.getDeIndexRequests());
+ //TODO: look at indexing ordering
final Observable<BatchOperation> batchOps = Observable.merge(index, deIndex);
//buffer into the max size we can send ES and fire them all off until we're completed
@@ -207,9 +208,10 @@ public class EsIndexProducerImpl implements IndexProducer {
if ( error ) {
if(errorString.lastIndexOf("rejected execution (queue capacity")>=0){
try{
- log.warn("Encountered Queue Capacity Exception from ElasticSearch slowing by " +indexFig.getSleepTimeForQueueError());
+ log.warn("Encountered Queue Capacity Exception from ElasticSearch slowing by "
+ + indexFig.getSleepTimeForQueueError() );
Thread.sleep(indexFig.getSleepTimeForQueueError());
- }catch (InterruptedException ie){
+ }catch (Exception e){
//move on
}
}