You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/12 22:59:52 UTC

incubator-usergrid git commit: Fixes subscription death on error

Repository: incubator-usergrid
Updated Branches:
  refs/heads/subscription-bugfixes [created] 3f821f580


Fixes subscription death on error


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3f821f58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3f821f58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3f821f58

Branch: refs/heads/subscription-bugfixes
Commit: 3f821f5803c467b45148c51ba192fd957146dc1e
Parents: e3a4a95
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Aug 12 14:59:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Aug 12 14:59:46 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java          | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3f821f58/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 457a900..ed70c62 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -177,7 +177,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                     batchOperation.doOperation( client, bulkRequestBuilder );
                 } ) )
                 //write them
-            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).doOnError( t -> log.error( "Unable to process batches", t ) );
+            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) );
 
 
         //now that we've processed them all, ack the futures after our last batch comes through
@@ -194,9 +194,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
         //mark this as done
         return processedIndexOperations.doOnNext( processedIndexOp -> {
                 processedIndexOp.done();
-                roundtripTimer.update(System.currentTimeMillis() - processedIndexOp.getCreationTime());
-            }
-        ).doOnError(t -> log.error("Unable to ack futures", t));
+                roundtripTimer.update( System.currentTimeMillis() - processedIndexOp.getCreationTime() );
+            } );
     }
 
 
@@ -270,7 +269,11 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
          * Send the data through the buffer
          */
         public void send( final IndexOperationMessage indexOp ) {
-            subscriber.onNext( indexOp );
+            try {
+                subscriber.onNext( indexOp );
+            }catch(Exception e){
+                log.error( "Unable to process message for indexOp {}, error follows.", indexOp, e );
+            }
         }