You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2017/04/20 12:15:40 UTC
cxf git commit: [CXF-6576] Handle exceptions in MessageListener
container without using setExceptionListener
Repository: cxf
Updated Branches:
refs/heads/jms-exception-handling [created] 30f0743d0
[CXF-6576] Handle exceptions in MessageListener container without using setExceptionListener
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/30f0743d
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/30f0743d
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/30f0743d
Branch: refs/heads/jms-exception-handling
Commit: 30f0743d01a3b3060b35622870de6481c098742a
Parents: b43dfb9
Author: Christian Schneider <ch...@die-schneider.net>
Authored: Thu Apr 20 14:15:29 2017 +0200
Committer: Christian Schneider <ch...@die-schneider.net>
Committed: Thu Apr 20 14:15:29 2017 +0200
----------------------------------------------------------------------
.../cxf/transport/jms/JMSDestination.java | 7 +-
.../util/PollingMessageListenerContainer.java | 80 +++++---------------
.../transport/jms/util/MessageListenerTest.java | 9 ++-
3 files changed, 29 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
index 2d7f4db..9794cd5 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
@@ -118,19 +118,20 @@ public class JMSDestination extends AbstractMultiplexDestination implements Mess
Session session = null;
try {
connection = JMSFactory.createConnection(jmsConfig);
- connection.setExceptionListener(new ExceptionListener() {
+ ExceptionListener exListener = new ExceptionListener() {
public void onException(JMSException exception) {
if (!shutdown) {
LOG.log(Level.WARNING, "Exception on JMS connection. Trying to reconnect", exception);
restartConnection();
}
}
- });
+ };
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = jmsConfig.getTargetDestination(session);
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection,
- destination, this);
+ destination,
+ this, exListener);
container.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
container.setTransactionManager(jmsConfig.getTransactionManager());
container.setMessageSelector(jmsConfig.getMessageSelector());
http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index c4276eb..815bcf1 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -36,12 +37,14 @@ import org.apache.cxf.common.logging.LogUtils;
public class PollingMessageListenerContainer extends AbstractMessageListenerContainer {
private static final Logger LOG = LogUtils.getL7dLogger(PollingMessageListenerContainer.class);
+ private ExceptionListener exceptionListener;
public PollingMessageListenerContainer(Connection connection, Destination destination,
- MessageListener listenerHandler) {
+ MessageListener listenerHandler, ExceptionListener exceptionListener) {
this.connection = connection;
this.destination = destination;
this.listenerHandler = listenerHandler;
+ this.exceptionListener = exceptionListener;
}
private class Poller extends AbstractPoller implements Runnable {
@@ -49,11 +52,10 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
@Override
public void run() {
Session session = null;
- init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
- // Create session early to optimize performance
+ // Create session early to optimize performance // In
session = closer.register(connection.createSession(transacted, acknowledgeMode));
MessageConsumer consumer = closer.register(createConsumer(session));
while (running) {
@@ -70,14 +72,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
safeRollBack(session);
}
}
- } catch (Throwable e) {
- catchUnexpectedExceptionDuringPolling(null, e);
+ } catch (Exception e) {
+ handleException(e);
}
}
-
}
- @Override
protected void safeRollBack(Session session) {
try {
if (session != null && session.getTransacted()) {
@@ -94,7 +94,6 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
@Override
public void run() {
- init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
@@ -121,14 +120,12 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
safeRollBack(session);
}
} catch (Exception e) {
- catchUnexpectedExceptionDuringPolling(null, e);
+ handleException(e);
}
-
}
}
- @Override
protected void safeRollBack(Session session) {
try {
transactionManager.rollback();
@@ -140,61 +137,18 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
}
private abstract class AbstractPoller {
- private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
- private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
- protected int retryCounter = -1;
- protected int counter;
- protected int sleepingTime = 5000;
-
- protected void init() {
- if (jndiEnvironment != null) {
- if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
- retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
- }
- if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
- sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
- }
- }
- }
-
- protected boolean hasToCount() {
- return retryCounter > -1;
- }
-
- protected boolean hasToStop() {
- return counter > retryCounter;
- }
-
- protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) {
+ protected void handleException(Exception e) {
LOG.log(Level.WARNING, "Unexpected exception.", e);
- if (hasToCount()) {
- counter++;
- if (hasToStop()) {
- stop(session, e);
- }
- }
- if (running) {
- try {
- String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
- log += hasToCount()
- ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter
- : "";
- LOG.log(Level.WARNING, log);
- Thread.sleep(sleepingTime);
- } catch (InterruptedException e1) {
- LOG.log(Level.WARNING, e1.getMessage());
- }
- }
- }
-
- protected void stop(Session session, Throwable e) {
- LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
- safeRollBack(session);
running = false;
+ JMSException wrapped;
+ if (e instanceof JMSException) {
+ wrapped = (JMSException) e;
+ } else {
+ wrapped = new JMSException("");
+ wrapped.addSuppressed(e);
+ }
+ PollingMessageListenerContainer.this.exceptionListener.onException(wrapped);
}
-
- protected abstract void safeRollBack(Session session);
-
}
private MessageConsumer createConsumer(Session session) throws JMSException {
http://git-wip-us.apache.org/repos/asf/cxf/blob/30f0743d/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index 82cc37a..4adf921 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.util;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -52,8 +53,14 @@ public class MessageListenerTest {
Queue dest = JMSUtil.createQueue(connection, "test");
MessageListener listenerHandler = new TestMessageListener();
+ ExceptionListener exListener = new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ }
+ };
PollingMessageListenerContainer container = new PollingMessageListenerContainer(connection, dest,
- listenerHandler);
+ listenerHandler, exListener);
container.setTransacted(false);
container.setAcknowledgeMode(Session.SESSION_TRANSACTED);