You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2017/04/20 12:15:40 UTC

cxf git commit: [CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener

Repository: cxf
Updated Branches:
  refs/heads/jms-exception-handling [created] 30f0743d0


[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener


Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/30f0743d
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/30f0743d
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/30f0743d

Branch: refs/heads/jms-exception-handling
Commit: 30f0743d01a3b3060b35622870de6481c098742a
Parents: b43dfb9
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Thu Apr 20 14:15:29 2017 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Thu Apr 20 14:15:29 2017 +0200

----------------------------------------------------------------------
 .../cxf/transport/jms/JMSDestination.java       |  7 +-
 .../util/PollingMessageListenerContainer.java   | 80 +++++---------------
 .../transport/jms/util/MessageListenerTest.java |  9 ++-
 3 files changed, 29 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 2d7f4db..9794cd5 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -118,19 +118,20 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess
         Session session = null;
         try {
             connection = JMSFactory.createConnection(jmsConfig);
-            connection.setExceptionListener(new ExceptionListener() {
+            ExceptionListener exListener = new ExceptionListener() {
                 public void onException(JMSException exception) {
                     if (!shutdown) {
                         LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception);
                         restartConnection();
                     }
                 }
-            });
+            };
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Destination destination = jmsConfig.getTargetDestination(session);
 
             PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
-                                                                                            destination, this);
+                                                                                            destination, 
+                                                                                            this, exListener);
             container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
             container.setTransactionManager(jmsConfig.getTransactionManager());
             container.setMessageSelector(jmsConfig.getMessageSelector());

http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index c4276eb..815bcf1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -36,12 +37,14 @@ import org.apache.cxf.common.logging.LogUtils;
 
 public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
     private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
+    private ExceptionListener exceptionListener;
 
     public PollingMessageListenerContainer(Connection connection, Destination destination,
-                                           MessageListener listenerHandler) {
+                                           MessageListener listenerHandler, ExceptionListener exceptionListener) {
         this.connection = connection;
         this.destination = destination;
         this.listenerHandler = listenerHandler;
+        this.exceptionListener = exceptionListener;
     }
 
     private class Poller extends AbstractPoller implements Runnable {
@@ -49,11 +52,10 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
         @Override
         public void run() {
             Session session = null;
-            init();
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
-                    // Create session early to optimize performance
+                    // Create session early to optimize performance                // In
                     session = closer.register(connection.createSession(transacted, acknowledgeMode));
                     MessageConsumer consumer = closer.register(createConsumer(session));
                     while (running) {
@@ -70,14 +72,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                             safeRollBack(session);
                         }
                     }
-                } catch (Throwable e) {
-                    catchUnexpectedExceptionDuringPolling(null, e);
+                } catch (Exception e) {
+                    handleException(e);
                 }
             }
-
         }
 
-        @Override
         protected void safeRollBack(Session session) {
             try {
                 if (session != null && session.getTransacted()) {
@@ -94,7 +94,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
 
         @Override
         public void run() {
-            init();
             while (running) {
                 try (ResourceCloser closer = new ResourceCloser()) {
                     closer.register(createInitialContext());
@@ -121,14 +120,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
                         safeRollBack(session);
                     }
                 } catch (Exception e) {
-                    catchUnexpectedExceptionDuringPolling(null, e);
+                    handleException(e);
                 }
-
             }
 
         }
 
-        @Override
         protected void safeRollBack(Session session) {
             try {
                 transactionManager.rollback();
@@ -140,61 +137,18 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
     }
 
     private abstract class AbstractPoller {
-        private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
-        private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
-        protected int retryCounter = -1;
-        protected int counter;
-        protected int sleepingTime = 5000;
-
-        protected void init() {
-            if (jndiEnvironment != null) {
-                if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
-                    retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
-                }
-                if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
-                    sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
-                }
-            }
-        }
-
-        protected boolean hasToCount() {
-            return retryCounter > -1;
-        }
-
-        protected boolean hasToStop() {
-            return counter > retryCounter;
-        }
-
-        protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) {
+        protected void handleException(Exception e) {
             LOG.log(Level.WARNING, "Unexpected exception.", e);
-            if (hasToCount()) {
-                counter++;
-                if (hasToStop()) {
-                    stop(session, e);
-                }
-            }
-            if (running) {
-                try {
-                    String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
-                    log += hasToCount()
-                        ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter
-                        : "";
-                    LOG.log(Level.WARNING, log);
-                    Thread.sleep(sleepingTime);
-                } catch (InterruptedException e1) {
-                    LOG.log(Level.WARNING, e1.getMessage());
-                }
-            }
-        }
-
-        protected void stop(Session session, Throwable e) {
-            LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
-            safeRollBack(session);
             running = false;
+            JMSException wrapped;
+            if (e  instanceof JMSException) {
+                wrapped = (JMSException) e;
+            } else {
+                wrapped = new JMSException("");
+                wrapped.addSuppressed(e);
+            }
+            PollingMessageListenerContainer.this.exceptionListener.onException(wrapped);
         }
-
-        protected abstract void safeRollBack(Session session);
-
     }
     
     private MessageConsumer createConsumer(Session session) throws JMSException {

http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index 82cc37a..4adf921 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.util;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -52,8 +53,14 @@ public class MessageListenerTest {
         Queue dest = JMSUtil.createQueue(connection, "test");
 
         MessageListener listenerHandler = new TestMessageListener();
+        ExceptionListener exListener = new ExceptionListener() {
+            
+            @Override
+            public void onException(JMSException exception) {
+            }
+        };
         PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest,
-                                                                                        listenerHandler);
+                                                                                        listenerHandler, exListener);
         container.setTransacted(false);
         container.setAcknowledgeMode(Session.SESSION_TRANSACTED);