You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/14 23:50:23 UTC

[08/32] usergrid git commit: filter some ops out, check for queue overflow

filter some ops out, check for queue overflow


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f35d01c7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f35d01c7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f35d01c7

Branch: refs/heads/master
Commit: f35d01c730998d45ca6feb5edf9138f04d185951
Parents: 5f20ece
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Oct 5 13:59:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Oct 5 13:59:10 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 31 ++++++++++++--------
 .../usergrid/persistence/index/IndexFig.java    |  3 ++
 .../index/impl/EsIndexProducerImpl.java         |  7 +++++
 .../queue/impl/QueueManagerFactoryImpl.java     |  2 +-
 4 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 92faed4..e215d48 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -239,7 +239,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
             if (event == null) {
                 logger.error("AsyncEvent type or event is null!");
-                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+                return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
             }
             try {
                 //merge each operation to a master observable;
@@ -254,10 +254,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
                 } else if (event instanceof InitializeApplicationIndexEvent) {
                     //does not return observable
                     handleInitializeApplicationIndex(message);
-                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
+                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
                 } else {
                     logger.error("Unknown EventType: {}", event);
-                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), false));
+                    return Observable.just(new IndexEventResult(message, Optional.<IndexOperationMessage>absent(), true));
                 }
             }catch (Exception e){
                 logger.error("Failed to index entity", e,message);
@@ -270,20 +270,25 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         return masterObservable
             //remove unsuccessful
-            .filter( indexEventResult -> indexEventResult.success() && indexEventResult.getIndexOperationMessage()
-                                                                                       .isPresent() )
+            .filter( indexEventResult -> indexEventResult.shouldProcess()  )
             //take the max
             .buffer( MAX_TAKE )
             //map them to index results and return them
             .flatMap( indexEventResults -> {
                 IndexOperationMessage combined = new IndexOperationMessage();
                 indexEventResults.stream().forEach(
-                    indexEventResult -> combined.ingest( indexEventResult.getIndexOperationMessage().get() ) );
+                    indexEventResult ->{
+                        if(indexEventResult.getIndexOperationMessage().isPresent()) {
+                            combined.ingest(indexEventResult.getIndexOperationMessage().get());
+                        }
+                    } );
+
 
                 //ack after successful completion of the operation.
-                return indexProducer.put( combined ).flatMap( operationResult -> Observable.from( indexEventResults ) )
+                return indexProducer.put( combined )
+                    .flatMap( operationResult -> Observable.from( indexEventResults ) )
                     //ack each message, but only if we didn't error.  If we did, we'll want to log it and
-                                    .map( indexEventResult -> {
+                    .map( indexEventResult -> {
                                         ack( indexEventResult.queueMessage );
                                         return indexEventResult;
                                     } );
@@ -562,21 +567,21 @@ public class AmazonAsyncEventService implements AsyncEventService {
     public class IndexEventResult{
         private final QueueMessage queueMessage;
         private final Optional<IndexOperationMessage> indexOperationMessage;
-        private final boolean success;
+        private final boolean shouldProcess;
 
-        public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean success){
+        public IndexEventResult(QueueMessage queueMessage, Optional<IndexOperationMessage> indexOperationMessage ,boolean shouldProcess){
 
             this.queueMessage = queueMessage;
             this.indexOperationMessage = indexOperationMessage;
-            this.success = success;
+            this.shouldProcess = shouldProcess;
         }
 
         public QueueMessage getQueueMessage() {
             return queueMessage;
         }
 
-        public boolean success() {
-            return success;
+        public boolean shouldProcess() {
+            return shouldProcess;
         }
 
         public Optional<IndexOperationMessage> getIndexOperationMessage() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 4f35730..e093d7d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -196,4 +196,7 @@ public interface IndexFig extends GuicyFig {
     long getWriteTimeout();
 
 
+    @Default("1000")
+    @Key( "elasticsearch_queue_error_sleep_ms" )
+    long getSleepTimeForQueueError();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
index 869b75a..828027c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexProducerImpl.java
@@ -205,6 +205,13 @@ public class EsIndexProducerImpl implements IndexProducer {
         }
 
         if ( error ) {
+            if(errorString.lastIndexOf("rejected execution (queue capacity")>=0){
+                try{
+                    Thread.sleep(indexFig.getSleepTimeForQueueError());
+                }catch (InterruptedException ie){
+                    //move on
+                }
+            }
             throw new RuntimeException(
                 "Error during processing of bulk index operations one of the responses failed. \n" + errorString);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f35d01c7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index d0ed1ef..a1940d0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -47,7 +47,7 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory {
     }
     @Override
     public QueueManager getQueueManager(QueueScope scope) {
-        if(queueFig.overrideQueueForDefault()){
+        if(true==false){
             QueueManager manager = defaultManager.get(scope.getName());
             if(manager==null){
                 manager = new DefaultQueueManager();