You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/26 16:47:56 UTC
incubator-usergrid git commit: Fixes issue with throwable not being
caught in onSubscribe function
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o beb2a2a53 -> 2d1c8b8ac
Fixes issue with throwable not being caught in onSubscribe function
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d1c8b8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d1c8b8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d1c8b8a
Branch: refs/heads/two-dot-o
Commit: 2d1c8b8ac7b20b63a11d83adca56839d8b409cca
Parents: beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:47:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:47:58 2015 -0600
----------------------------------------------------------------------
.../index/impl/EsIndexBufferConsumerImpl.java | 39 +++++++-------------
1 file changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d1c8b8a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 7e64de3..d064b97 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -153,12 +153,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//name our thread so it's easy to see
Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
- List<IndexOperationMessage> drainList;
+
+ List<IndexOperationMessage> drainList = null;
+
do {
- try {
+ Timer.Context timer = produceTimer.time();
+
+
+ try {
- Timer.Context timer = produceTimer.time();
drainList = bufferQueue
.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
@@ -174,10 +178,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
timer.stop();
}
- catch ( Exception e ) {
+ catch ( Throwable t ) {
final long sleepTime = config.getFailureRetryTime();
- log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, e );
+ log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
+
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
+ }
+
try {
Thread.sleep( sleepTime );
@@ -216,26 +225,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
inFlight.addAndGet( -1 * indexOperationMessages.size() );
}
} )
- //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die
- .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() {
- @Override
- public List<IndexOperationMessage> call( final Throwable throwable ) {
- final long sleepTime = config.getFailureRetryTime();
-
- log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, throwable );
-
- try {
- Thread.sleep( sleepTime );
- }
- catch ( InterruptedException ie ) {
- //swallow
- }
-
- indexErrorCounter.inc();
-
- return Collections.EMPTY_LIST;
- }
- } )
.subscribeOn( Schedulers.newThread() );