You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/26 16:47:56 UTC

incubator-usergrid git commit: Fixes issue with throwable not being caught in onSubscribe function

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o beb2a2a53 -> 2d1c8b8ac


Fixes issue with throwable not being caught in onSubscribe function


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d1c8b8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d1c8b8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d1c8b8a

Branch: refs/heads/two-dot-o
Commit: 2d1c8b8ac7b20b63a11d83adca56839d8b409cca
Parents: beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:47:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:47:58 2015 -0600

----------------------------------------------------------------------
 .../index/impl/EsIndexBufferConsumerImpl.java   | 39 +++++++-------------
 1 file changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d1c8b8a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 7e64de3..d064b97 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -153,12 +153,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         //name our thread so it's easy to see
                         Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
-                        List<IndexOperationMessage> drainList;
+
+                        List<IndexOperationMessage> drainList = null;
+
                         do {
-                            try {
 
+                            Timer.Context timer = produceTimer.time();
+
+
+                            try {
 
-                                Timer.Context timer = produceTimer.time();
 
                                 drainList = bufferQueue
                                     .take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
@@ -174,10 +178,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                                 timer.stop();
                             }
 
-                            catch ( Exception e ) {
+                            catch ( Throwable t ) {
                                 final long sleepTime = config.getFailureRetryTime();
 
-                                log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, e );
+                                log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, t );
+
+                                if ( drainList != null ) {
+                                    inFlight.addAndGet( -1 * drainList.size() );
+                                }
+
 
                                 try {
                                     Thread.sleep( sleepTime );
@@ -216,26 +225,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
                         inFlight.addAndGet( -1 * indexOperationMessages.size() );
                     }
                 } )
-                //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die
-                .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() {
-                    @Override
-                    public List<IndexOperationMessage> call( final Throwable throwable ) {
-                        final long sleepTime = config.getFailureRetryTime();
-
-                        log.error( "Failed to dequeue.  Sleeping for {} milliseconds", sleepTime, throwable );
-
-                        try {
-                            Thread.sleep( sleepTime );
-                        }
-                        catch ( InterruptedException ie ) {
-                            //swallow
-                        }
-
-                        indexErrorCounter.inc();
-
-                        return Collections.EMPTY_LIST;
-                    }
-                } )
 
                 .subscribeOn( Schedulers.newThread() );