You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ff...@apache.org on 2017/02/22 08:06:38 UTC

cxf git commit: [CXF-7197]Loop on exception during polling message on JMS queue

Repository: cxf
Updated Branches:
  refs/heads/master 8fb21e240 -> 1552f8811


[CXF-7197]Loop on exception during polling message on JMS queue


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1552f881
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1552f881
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1552f881

Branch: refs/heads/master
Commit: 1552f881183ba35b3706dc3c8061d7c6d85c7238
Parents: 8fb21e2
Author: Freeman Fang <fr...@gmail.com>
Authored: Wed Feb 22 16:06:31 2017 +0800
Committer: Freeman Fang <fr...@gmail.com>
Committed: Wed Feb 22 16:06:31 2017 +0800

----------------------------------------------------------------------
 .../util/PollingMessageListenerContainer.java   | 81 +++++++++++++++++---
 1 file changed, 72 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/1552f881/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index ad8fdbf..8096eb3 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -44,15 +44,17 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
         this.listenerHandler = listenerHandler;
     }
 
-    private class Poller implements Runnable {
+    private class Poller extends AbstractPoller implements Runnable {
 
         @Override
         public void run() {
+            Session session = null;
+            init();
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
                     // Create session early to optimize performance
-                    Session session = closer.register(connection.createSession(transacted, acknowledgeMode));
+                    session = closer.register(connection.createSession(transacted, acknowledgeMode));
                     MessageConsumer consumer = closer.register(createConsumer(session));
                     while (running) {
                         Message message = consumer.receive(1000);
@@ -65,19 +67,20 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                             }
                         } catch (Exception e) {
                             LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e);
-                            safeRollBack(session, e);
+                            safeRollBack(session);
                         }
                     }
                 } catch (Exception e) {
-                    LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e);
+                    catchUnexpectedExceptionDuringPolling(null, e);
                 }
             }
 
         }
 
-        private void safeRollBack(Session session, Exception e) {
+        @Override
+        protected void safeRollBack(Session session) {
             try {
-                if (session.getTransacted()) {
+                if (session != null && session.getTransacted()) {
                     session.rollback();
                 }
             } catch (Exception e1) {
@@ -87,10 +90,11 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
 
     }
 
-    private class XAPoller implements Runnable {
+    private class XAPoller extends AbstractPoller implements Runnable {
 
         @Override
         public void run() {
+            init();
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
@@ -117,14 +121,15 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                         safeRollBack(session);
                     }
                 } catch (Exception e) {
-                    LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e);
+                    catchUnexpectedExceptionDuringPolling(null, e);
                 }
 
             }
 
         }
 
-        private void safeRollBack(Session session) {
+        @Override
+        protected void safeRollBack(Session session) {
             try {
                 transactionManager.rollback();
             } catch (Exception e) {
@@ -134,6 +139,64 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
 
     }
 
+    private abstract class AbstractPoller {
+        private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
+        private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
+        protected int retryCounter = -1;
+        protected int counter;
+        protected int sleepingTime = 5000;
+
+        protected void init() {
+            if (jndiEnvironment != null) {
+                if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
+                    retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
+                }
+                if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
+                    sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
+                }
+            }
+        }
+
+        protected boolean hasToCount() {
+            return retryCounter > -1;
+        }
+
+        protected boolean hasToStop() {
+            return counter > retryCounter;
+        }
+
+        protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) {
+            LOG.log(Level.WARNING, "Unexpected exception.", e);
+            if (hasToCount()) {
+                counter++;
+                if (hasToStop()) {
+                    stop(session, e);
+                }
+            }
+            if (running) {
+                try {
+                    String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
+                    log += hasToCount()
+                        ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter
+                        : "";
+                    LOG.log(Level.WARNING, log);
+                    Thread.sleep(sleepingTime);
+                } catch (InterruptedException e1) {
+                    LOG.log(Level.WARNING, e1.getMessage());
+                }
+            }
+        }
+
+        protected void stop(Session session, Throwable e) {
+            LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
+            safeRollBack(session);
+            running = false;
+        }
+
+        protected abstract void safeRollBack(Session session);
+
+    }
+    
     private MessageConsumer createConsumer(Session session) throws JMSException {
         if (durableSubscriptionName != null && destination instanceof Topic) {
             return session.createDurableSubscriber((Topic)destination, durableSubscriptionName,