You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ff...@apache.org on 2017/02/22 08:06:38 UTC
cxf git commit: [CXF-7197]Loop on exception during polling message on
JMS queue
Repository: cxf
Updated Branches:
refs/heads/master 8fb21e240 -> 1552f8811
[CXF-7197]Loop on exception during polling message on JMS queue
Project: http://git-wip-us.apache.org/repos/asf/cxf/repo
Commit: http://git-wip-us.apache.org/repos/asf/cxf/commit/1552f881
Tree: http://git-wip-us.apache.org/repos/asf/cxf/tree/1552f881
Diff: http://git-wip-us.apache.org/repos/asf/cxf/diff/1552f881
Branch: refs/heads/master
Commit: 1552f881183ba35b3706dc3c8061d7c6d85c7238
Parents: 8fb21e2
Author: Freeman Fang <fr...@gmail.com>
Authored: Wed Feb 22 16:06:31 2017 +0800
Committer: Freeman Fang <fr...@gmail.com>
Committed: Wed Feb 22 16:06:31 2017 +0800
----------------------------------------------------------------------
.../util/PollingMessageListenerContainer.java | 81 +++++++++++++++++---
1 file changed, 72 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cxf/blob/1552f881/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
index ad8fdbf..8096eb3 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/PollingMessageListenerContainer.java
@@ -44,15 +44,17 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
this.listenerHandler = listenerHandler;
}
- private class Poller implements Runnable {
+ private class Poller extends AbstractPoller implements Runnable {
@Override
public void run() {
+ Session session = null;
+ init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
// Create session early to optimize performance
- Session session = closer.register(connection.createSession(transacted, acknowledgeMode));
+ session = closer.register(connection.createSession(transacted, acknowledgeMode));
MessageConsumer consumer = closer.register(createConsumer(session));
while (running) {
Message message = consumer.receive(1000);
@@ -65,19 +67,20 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
}
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception while processing jms message in cxf. Rolling back", e);
- safeRollBack(session, e);
+ safeRollBack(session);
}
}
} catch (Exception e) {
- LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e);
+ catchUnexpectedExceptionDuringPolling(null, e);
}
}
}
- private void safeRollBack(Session session, Exception e) {
+ @Override
+ protected void safeRollBack(Session session) {
try {
- if (session.getTransacted()) {
+ if (session != null && session.getTransacted()) {
session.rollback();
}
} catch (Exception e1) {
@@ -87,10 +90,11 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
}
- private class XAPoller implements Runnable {
+ private class XAPoller extends AbstractPoller implements Runnable {
@Override
public void run() {
+ init();
while (running) {
try (ResourceCloser closer = new ResourceCloser()) {
closer.register(createInitialContext());
@@ -117,14 +121,15 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
safeRollBack(session);
}
} catch (Exception e) {
- LOG.log(Level.WARNING, "Unexpected exception. Restarting session and consumer", e);
+ catchUnexpectedExceptionDuringPolling(null, e);
}
}
}
- private void safeRollBack(Session session) {
+ @Override
+ protected void safeRollBack(Session session) {
try {
transactionManager.rollback();
} catch (Exception e) {
@@ -134,6 +139,64 @@ public class PollingMessageListenerContainer extends AbstractMessageListenerCont
}
+ private abstract class AbstractPoller {
+ private static final String RETRY_COUNTER_ON_EXCEPTION = "jms.polling.retrycounteronexception";
+ private static final String SLEEPING_TIME_BEFORE_RETRY = "jms.polling.sleepingtimebeforeretry";
+ protected int retryCounter = -1;
+ protected int counter;
+ protected int sleepingTime = 5000;
+
+ protected void init() {
+ if (jndiEnvironment != null) {
+ if (jndiEnvironment.containsKey(RETRY_COUNTER_ON_EXCEPTION)) {
+ retryCounter = Integer.valueOf(jndiEnvironment.getProperty(RETRY_COUNTER_ON_EXCEPTION));
+ }
+ if (jndiEnvironment.containsKey(SLEEPING_TIME_BEFORE_RETRY)) {
+ sleepingTime = Integer.valueOf(jndiEnvironment.getProperty(SLEEPING_TIME_BEFORE_RETRY));
+ }
+ }
+ }
+
+ protected boolean hasToCount() {
+ return retryCounter > -1;
+ }
+
+ protected boolean hasToStop() {
+ return counter > retryCounter;
+ }
+
+ protected void catchUnexpectedExceptionDuringPolling(Session session, Throwable e) {
+ LOG.log(Level.WARNING, "Unexpected exception.", e);
+ if (hasToCount()) {
+ counter++;
+ if (hasToStop()) {
+ stop(session, e);
+ }
+ }
+ if (running) {
+ try {
+ String log = "Now sleeping for " + sleepingTime / 1000 + " seconds";
+ log += hasToCount()
+ ? ". Then restarting session and consumer: attempt " + counter + "/" + retryCounter
+ : "";
+ LOG.log(Level.WARNING, log);
+ Thread.sleep(sleepingTime);
+ } catch (InterruptedException e1) {
+ LOG.log(Level.WARNING, e1.getMessage());
+ }
+ }
+ }
+
+ protected void stop(Session session, Throwable e) {
+ LOG.log(Level.WARNING, "Stopping the jms message polling thread in cxf", e);
+ safeRollBack(session);
+ running = false;
+ }
+
+ protected abstract void safeRollBack(Session session);
+
+ }
+
private MessageConsumer createConsumer(Session session) throws JMSException {
if (durableSubscriptionName != null && destination instanceof Topic) {
return session.createDurableSubscriber((Topic)destination, durableSubscriptionName,