You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/06 22:35:00 UTC
[2/9] usergrid git commit: filter some ops out,
check for queue overflow
filter some ops out, check for queue overflow
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f35d01c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f35d01c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f35d01c7
Branch: refs/heads/2.1-release
Commit: f35d01c730998d45ca6feb5edf9138f04d185951
Parents: 5f20ece
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 13:59:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 13:59:10 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 31 ++++++++++++--------
.../usergrid/persistence/index/IndexFig.java | 3 ++
.../index/impl/EsIndexProducerImpl.java | 7 +++++
.../queue/impl/QueueManagerFactoryImpl.java | 2 +-
4 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 92faed4..e215d48 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -239,7 +239,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
if (event == null) {
logger.error("AsyncEvent type or event is null!");
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
}
try {
//merge each operation to a master observable;
@@ -254,10 +254,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
} else if (event instanceof InitializeApplicationIndexEvent) {
//does not return observable
handleInitializeApplicationIndex(message);
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
} else {
logger.error("Unknown EventType: {}", event);
- return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+ return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
}
}catch (Exception e){
logger.error("Failed to index entity", e,message);
@@ -270,20 +270,25 @@ public class AmazonAsyncEventService implements AsyncEventService {
return masterObservable
//remove unsuccessful
- .filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage()
- .isPresent() )
+ .filter( indexEventResult -> indexEventResult.shouldProcess() )
//take the max
.buffer( MAX_TAKE )
//map them to index results and return them
.flatMap( indexEventResults -> {
IndexOperationMessage combined = new IndexOperationMessage();
indexEventResults.stream().forEach(
- indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get() ) );
+ indexEventResult ->{
+ if(indexEventResult.getIndexOperationMessage().isPresent()) {
+ combined.ingest(indexEventResult.getIndexOperationMessage().get());
+ }
+ } );
+
//ack after successful completion of the operation.
- return indexProducer.put( combined ).flatMap( operationResult -> Observable.from( indexEventResults ) )
+ return indexProducer.put( combined )
+ .flatMap( operationResult -> Observable.from( indexEventResults ) )
//ack each message, but only if we didn't error. If we did, we'll want to log it and
- .map( indexEventResult -> {
+ .map( indexEventResult -> {
ack( indexEventResult.queueMessage );
return indexEventResult;
} );
@@ -562,21 +567,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
public class IndexEventResult{
private final QueueMessage queueMessage;
private final Optional<IndexOperationMessage> indexOperationMessage;
- private final boolean success;
+ private final boolean shouldProcess;
- public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean success){
+ public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean shouldProcess){
this.queueMessage = queueMessage;
this.indexOperationMessage = indexOperationMessage;
- this.success = success;
+ this.shouldProcess = shouldProcess;
}
public QueueMessage getQueueMessage() {
return queueMessage;
}
- public boolean success() {
- return success;
+ public boolean shouldProcess() {
+ return shouldProcess;
}
public Optional<IndexOperationMessage> getIndexOperationMessage() {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 4f35730..e093d7d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -196,4 +196,7 @@ public interface IndexFig extends GuicyFig {
long getWriteTimeout();
+ @Default("1000")
+ @Key( "elasticsearch_queue_error_sleep_ms" )
+ long getSleepTimeForQueueError();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 869b75a..828027c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -205,6 +205,13 @@ public class EsIndexProducerImpl implements IndexProducer {
}
if ( error ) {
+ if(errorString.lastIndexOf("rejected execution (queue capacity")>=0){
+ try{
+ Thread.sleep(indexFig.getSleepTimeForQueueError());
+ }catch (InterruptedException ie){
+ //move on
+ }
+ }
throw new RuntimeException(
"Error during processing of bulk index operations one of the responses failed. \n" + errorString);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index d0ed1ef..a1940d0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -47,7 +47,7 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory {
}
@Override
public QueueManager getQueueManager(QueueScope scope) {
- if(queueFig.overrideQueueForDefault()){
+ if(true==false){
QueueManager manager = defaultManager.get(scope.getName());
if(manager==null){
manager = new DefaultQueueManager();