You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/24 17:34:17 UTC

[1/6] git commit: simple counting;fix test; add lock manager

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-candidate [created] e0f7b490a


simple counting;fix test; add lock manager


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ec39c7ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ec39c7ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ec39c7ad

Branch: refs/heads/two-dot-o-candidate
Commit: ec39c7ad4f78f70de0520a060aabb79f8e305632
Parents: 66fd523
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 22 17:22:09 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 22 17:22:09 2014 -0600

----------------------------------------------------------------------
 .../main/resources/usergrid-core-context.xml    |  6 +-
 .../notifications/SingleQueueTaskManager.java   | 94 ++++++++------------
 .../AbstractServiceNotificationIT.java          | 24 +++--
 3 files changed, 56 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec39c7ad/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 7abc9e6..28d118c 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -70,16 +70,16 @@
     <bean id="loadBalancingPolicy" class="me.prettyprint.cassandra.connection.DynamicLoadBalancingPolicy"/>
 
 	<!--  locking for a single node -->	
-	<bean name="lockManager" class="org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl" />
+	<!--<bean name="lockManager" class="org.apache.usergrid.locking.singlenode.SingleNodeLockManagerImpl" />-->
 	
 	<!--  hector based locks -->
 	<!-- Note that if this is deployed in a production cluster, the RF on the keyspace MUST be updated to use an odd number for it's replication Factor.
 		  Even numbers can potentially case the locks to fail, via "split brain" when read at QUORUM on lock verification-->
 	
-	<!--  <bean name="lockManager" class="org.apache.usergrid.locking.cassandra.HectorLockManagerImpl" >
+	<bean name="lockManager" class="org.apache.usergrid.locking.cassandra.HectorLockManagerImpl" >
 		<property name="cluster" ref="cassandraCluster"/>
 		<property name="keyspaceName" value="${cassandra.lock.keyspace}"/>
-	</bean>-->
+	</bean>
 	
 	<!--  zookeeper locks -->
 	<!--

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec39c7ad/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
index 4b11974..beb9c70 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/SingleQueueTaskManager.java
@@ -123,7 +123,7 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
             receipt.setUuid(savedReceipt.getUuid());
 
             List<EntityRef> entities = Arrays.asList(notification, device);
-            em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
+//            em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
         } else {
             em.update(receipt);
         }
@@ -145,68 +145,46 @@ public class SingleQueueTaskManager implements NotificationsTaskManager {
     }
 
     public void finishedBatch() throws Exception {
-        //synchronized (this) { //avoid issues with counting
-            long successes = this.successes.getAndSet(0); //reset counters
-            long failures = this.failures.getAndSet(0); //reset counters
-            this.hasFinished = true;
-
-            // refresh notification
-            Notification notification = em.get(this.notification.getUuid(), Notification.class);
-            notification.setModified(System.currentTimeMillis());
-
-            Map<String, Object> properties;
-            Map<String, Long> stats;
-            String statsKey = "statistics_batch";
-
-            //write out current results to a set so no overlap in multiple writes will occur
-            if (successes + failures > 0) {
-                properties = new HashMap<String, Object>(4);
-                stats = new HashMap<String, Long>(2);
-                stats.put("sent", successes);
-                stats.put("errors", failures);
-                properties.put(statsKey + "_" + System.currentTimeMillis(), stats);
-                properties.put("modified", notification.getModified());
-                em.updateProperties(notification, properties);
-            }
-
-            //resum the stats
-            properties = em.getProperties(notification); // re-read
-            long sent = 0;
-            long errors = 0;
-            for (String key : properties.keySet()) {
-                if (key.contains(statsKey)) {
-                    stats = (Map<String, Long>) properties.get(key);
-                    sent += stats.get("sent");
-                    errors += stats.get("errors");
-                }
-            }
-
-            //and write them out again, this will produce the most accurate count
-            stats = new HashMap<String, Long>(2);
-            stats.put("sent", sent);
-            stats.put("errors", errors);
-            notification.setStatistics(stats);
-
-            LOG.info("notification {} sending to {}", notification.getUuid(), sent + errors);
-
-            //none of this is known and should you ever do this
-            if (notification.getExpectedCount() <= (errors + sent)) {
-                notification.setFinished(notification.getModified());
-                properties.put("finished", notification.getModified());
-                properties.put("state", notification.getState());
-                LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
-            }
+        long successes = this.successes.getAndSet(0); //reset counters
+        long failures = this.failures.getAndSet(0); //reset counters
+        this.hasFinished = true;
+
+        // refresh notification
+        Notification notification = em.get(this.notification.getUuid(), Notification.class);
+        notification.setModified(System.currentTimeMillis());
+
+        Map<String, Long> stats;
+        Map<String, Object> properties;
+        //resum the stats
+        properties = notification.getProperties();
+        long sent = successes;
+        long errors = failures;
+        //and write them out again, this will produce the most accurate count
+        stats = new HashMap<String, Long>(2);
+        stats.put("sent", sent);
+        stats.put("errors", errors);
+        notification.setStatistics(stats);
+
+        LOG.info("notification {} sending to {}", notification.getUuid(), sent + errors);
+
+        //none of this is known and should you ever do this
+        if (notification.getExpectedCount() <= (errors + sent)) {
+            notification.setFinished(notification.getModified());
+            properties.put("finished", notification.getModified());
+            properties.put("state", notification.getState());
+            LOG.info("done sending to devices in {} ms", notification.getFinished() - notification.getStarted());
+            notification.addProperties(properties);
+        }
 
-            LOG.info("notification finished batch: {}", notification.getUuid());
-            em.updateProperties(notification, properties);
-            em.update(notification);
-       // }
+        LOG.info("notification finished batch: {}", notification.getUuid());
+        em.update(notification);
 
-        //Set<Notifier> notifiers = new HashSet<Notifier>(proxy.getNotifierMap().values()); // remove dups
-       // proxy.asyncCheckForInactiveDevices(notifiers);
+        // Set<Notifier> notifiers = new HashSet<Notifier>(proxy.getNotifierMap().values()); // remove dups
+        //proxy.asyncCheckForInactiveDevices(notifiers);
     }
 
 
+
     protected void hasFinished(boolean hasFinished) {
         this.hasFinished = hasFinished;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ec39c7ad/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
index 8dcbdaf..6092e0a 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/AbstractServiceNotificationIT.java
@@ -27,6 +27,7 @@ import org.junit.Rule;
 import org.junit.rules.TestName;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -68,8 +69,7 @@ public class AbstractServiceNotificationIT extends AbstractServiceIT {
         while (System.currentTimeMillis() < timeout) {
             Thread.sleep(200);
             app.getEm().refreshIndex();
-            notification = app.getEm().get(notification.getUuid(),
-                    Notification.class);
+            notification = app.getEm().get(notification.getUuid(), Notification.class);
             if (notification.getFinished() != null) {
                 return notification;
             }
@@ -80,17 +80,27 @@ public class AbstractServiceNotificationIT extends AbstractServiceIT {
 
     protected List<EntityRef> getNotificationReceipts(EntityRef notification)
             throws Exception {
-        Results r = app.getEm().getCollection(notification,
-                Notification.RECEIPTS_COLLECTION, null, 1000000,
-                Query.Level.REFS, false);
+        Query query = new Query();
+        query.setCollection("receipts");
+        query.setLimit(100);
+        PathQuery<Receipt> pathQuery = new PathQuery<Receipt>(
+                new SimpleEntityRef(app.getEm().getApplicationRef()),
+                query
+        );
+        Iterator<Receipt> it = pathQuery.iterator(app.getEm());
         List<EntityRef> list =new ArrayList<EntityRef>();//get all
-        PagingResultsIterator it = new PagingResultsIterator(r);
         while(it.hasNext()){
-            list.add((EntityRef)it.next());
+            Receipt receipt =it.next();
+
+            if(receipt.getNotificationUUID().equals(notification.getUuid())) {
+                list.add(receipt);
+            }
         }
         return list;
     }
 
+
+
     protected void checkReceipts(Notification notification, int expected)
             throws Exception {
         List<EntityRef> receipts = getNotificationReceipts(notification);


[2/6] git commit: add exception logging

Posted by sf...@apache.org.
add exception logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/807583dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/807583dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/807583dd

Branch: refs/heads/two-dot-o-candidate
Commit: 807583dddeb31c7e56427ea2b88ac1c2e4cd5299
Parents: ec39c7a
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Sep 22 18:47:34 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Sep 22 18:47:34 2014 -0600

----------------------------------------------------------------------
 .../usergrid/services/notifications/ApplicationQueueManager.java    | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/807583dd/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index ee988dc..37f53c4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -343,6 +343,7 @@ public class ApplicationQueueManager implements QueueManager {
                                     providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
                                 } catch (Exception e) {
                                     tracker.failed(0, e.getMessage());
+                                    LOG.warn("failed to send notification with exception",e);
                                 } finally {
                                     LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
                                 }


[3/6] git commit: fix payload parsing

Posted by sf...@apache.org.
fix payload parsing


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/15b66513
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/15b66513
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/15b66513

Branch: refs/heads/two-dot-o-candidate
Commit: 15b66513337b389a1cb228e68f9bad77da3898ca
Parents: 807583d
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 23 08:26:30 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 23 08:26:30 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/entities/Receipt.java  | 20 +++++++++++++++++++-
 .../notifications/ApplicationQueueManager.java  |  2 +-
 2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/15b66513/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
index 1ffc351..1e145ac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Receipt.java
@@ -20,8 +20,10 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.usergrid.persistence.TypedEntity;
 
 import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashMap;
 import java.util.UUID;
 import org.apache.usergrid.persistence.annotations.EntityProperty;
+import org.mortbay.util.ajax.JSON;
 
 @XmlRootElement
 public class Receipt extends TypedEntity {
@@ -66,7 +68,23 @@ public class Receipt extends TypedEntity {
     public Receipt(UUID notificationUUID, String notifierId, Object payload,UUID deviceId) {
         this.notificationUUID = notificationUUID;
         this.notifierId = notifierId;
-        this.payload = payload;
+        HashMap receiptPayload;
+        if(! (payload instanceof HashMap) ){
+            if(payload instanceof String){
+                try {
+                    receiptPayload = (HashMap) JSON.parse((String) payload);
+                }catch (Exception e){
+                    receiptPayload = new HashMap<>();
+                    receiptPayload.put("payload", payload);
+                }
+            }else {
+                receiptPayload = new HashMap<>();
+                receiptPayload.put("payload", payload);
+            }
+        }else{
+            receiptPayload = (HashMap)payload;
+        }
+        this.payload = receiptPayload;
         this.setDeviceId(deviceId);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/15b66513/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 37f53c4..2bfa332 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -354,7 +354,7 @@ public class ApplicationQueueManager implements QueueManager {
                     }
 
                 } catch (Exception e) {
-                    LOG.error("Failure unknown",e);
+                    LOG.error("Failure while sending",e);
                 }
                 return message;
             }


[5/6] git commit: increment exception count

Posted by sf...@apache.org.
increment exception count


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/95529b1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/95529b1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/95529b1d

Branch: refs/heads/two-dot-o-candidate
Commit: 95529b1df332b22827443f745719f0e059113361
Parents: fd6b73f
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 23 18:55:40 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 23 18:55:40 2014 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/services/notifications/QueueListener.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/95529b1d/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index e7ba00e..6e9a7ef 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -193,7 +193,7 @@ public class QueueListener  {
             }catch (Exception ex){
                 LOG.error("failed to dequeue",ex);
                 try {
-                    long sleeptime = sleepWhenNoneFound*(consecutiveExceptions.get()+1);
+                    long sleeptime = sleepWhenNoneFound*(consecutiveExceptions.incrementAndGet());
                     LOG.info("sleeping due to failures {} ms", sleeptime);
                     Thread.sleep(sleeptime);
                 }catch (InterruptedException ie){


[6/6] git commit: increment queue version

Posted by sf...@apache.org.
increment queue version


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e0f7b490
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e0f7b490
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e0f7b490

Branch: refs/heads/two-dot-o-candidate
Commit: e0f7b490a34247ae8241b415eece4a27a03d7ee6
Parents: 95529b1
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Sep 24 08:47:55 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Sep 24 08:47:55 2014 -0600

----------------------------------------------------------------------
 .../usergrid/services/notifications/ApplicationQueueManager.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e0f7b490/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 2bfa332..75c1f28 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class ApplicationQueueManager implements QueueManager {
 
-    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_12";
+    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_15";
     public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
 


[4/6] git commit: adjusting sleep

Posted by sf...@apache.org.
adjusting sleep


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fd6b73f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fd6b73f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fd6b73f3

Branch: refs/heads/two-dot-o-candidate
Commit: fd6b73f33cbbb633f2fab517a9f6761616f341ab
Parents: 15b6651
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 23 18:52:35 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 23 18:53:35 2014 -0600

----------------------------------------------------------------------
 .../services/notifications/QueueListener.java        | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fd6b73f3/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 65d99be..e7ba00e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
-    public static int MAX_CONSECUTIVE_FAILS = 10;
+    public static int MAX_CONSECUTIVE_FAILS = 10000;
 
     public static final long MESSAGE_TRANSACTION_TIMEOUT = 60 * 5 * 1000;
 
@@ -180,11 +180,12 @@ public class QueueListener  {
                     LOG.info("QueueListener: sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
 
                     if(sleepBetweenRuns > 0) {
+                        LOG.info("QueueListener: sleep between rounds...sleep...{}", sleepBetweenRuns);
                         Thread.sleep(sleepBetweenRuns);
                     }
                 }
                 else{
-                    LOG.info("QueueListener: no messages...sleep...", results.size());
+                    LOG.info("QueueListener: no messages...sleep...{}", sleepWhenNoneFound);
                     Thread.sleep(sleepWhenNoneFound);
                 }
                 //send to the providers
@@ -192,13 +193,11 @@ public class QueueListener  {
             }catch (Exception ex){
                 LOG.error("failed to dequeue",ex);
                 try {
-                    Thread.sleep(sleepWhenNoneFound);
+                    long sleeptime = sleepWhenNoneFound*(consecutiveExceptions.get()+1);
+                    LOG.info("sleeping due to failures {} ms", sleeptime);
+                    Thread.sleep(sleeptime);
                 }catch (InterruptedException ie){
-                    LOG.info("sleep interupted");
-                }
-                if(consecutiveExceptions.getAndIncrement() > MAX_CONSECUTIVE_FAILS){
-                    LOG.error("killing message listener; too many failures");
-                    break;
+                    LOG.info("sleep interrupted");
                 }
             }
         }