You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2009/07/16 21:02:37 UTC
svn commit: r794779 - in /cxf/trunk/rt/transports/jms/src:
main/java/org/apache/cxf/transport/jms/
main/java/org/apache/cxf/transport/jms/continuations/
test/java/org/apache/cxf/transport/jms/
test/java/org/apache/cxf/transport/jms/continuations/ test/...
Author: dkulp
Date: Thu Jul 16 19:02:36 2009
New Revision: 794779
URL: http://svn.apache.org/viewvc?rev=794779&view=rev
Log:
[CXF-2343] Change jms transport throttling to use a disconnect/reconnect
instead of bogus message selector. Patch from Paul Hadrosek applied.
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=794779&r1=794778&r2=794779&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Thu Jul 16 19:02:36 2009
@@ -81,7 +81,8 @@
private int cacheLevel = DEFAULT_VALUE;
private String cacheLevelName;
private boolean enforceSpec = true;
-
+ private boolean acceptMessagesWhileStopping;
+
//For jms spec.
private String targetService;
private String requestURI;
@@ -386,7 +387,15 @@
public void setReconnectOnException(boolean reconnectOnException) {
this.reconnectOnException = reconnectOnException;
}
-
+
+ public boolean isAcceptMessagesWhileStopping() {
+ return acceptMessagesWhileStopping;
+ }
+
+ public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
+ this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
+ }
+
/**
* Tries to creates a ConnectionFactory from jndi if none was set as a property
* by using the jndConfig. Then it determiens if the connectionFactory should be wrapped
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=794779&r1=794778&r2=794779&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Thu Jul 16 19:02:36 2009
@@ -159,6 +159,9 @@
+ ", please set cacheLevel to the value less than "
+ " org.springframework.jms.listener.DefaultMessageListenerContainer.CACHE_CONSUMER");
}
+ if (jmsConfig.isAcceptMessagesWhileStopping()) {
+ jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
+ }
String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) {
jmsListener.setMessageSelector("JMSCorrelationID LIKE '"
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=794779&r1=794778&r2=794779&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Thu Jul 16 19:02:36 2009
@@ -33,8 +33,6 @@
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class JMSContinuation implements Continuation {
-
- static final String BOGUS_MESSAGE_SELECTOR = "orgApacheCxfTransportsJmsContinuations='too-many'";
private Bus bus;
private Message inMessage;
@@ -43,8 +41,6 @@
private DefaultMessageListenerContainer jmsListener;
private JMSConfiguration jmsConfig;
- private String currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
-
private Object userObject;
private boolean isNew = true;
@@ -163,14 +159,11 @@
// throttle the flow if there're too many continuation instances in memory
synchronized (continuations) {
modifyList(remove);
- if (remove && !BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
- jmsListener.setMessageSelector(currentMessageSelector);
- currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
- } else if (!remove && continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
- currentMessageSelector = jmsListener.getMessageSelector();
- if (!BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
- jmsListener.setMessageSelector(BOGUS_MESSAGE_SELECTOR);
-
+ if (continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
+ jmsListener.stop();
+ } else {
+ if (!jmsListener.isRunning()) {
+ jmsListener.start();
}
}
}
@@ -185,9 +178,6 @@
}
}
- String getCurrentMessageSelector() {
- return currentMessageSelector;
- }
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=794779&r1=794778&r2=794779&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Thu Jul 16 19:02:36 2009
@@ -130,6 +130,8 @@
assertEquals("The maxConcurrentConsumer should be set", jmsConfig.getMaxConcurrentConsumers(), 5);
assertEquals("The maxSuspendedContinuations should be set",
jmsConfig.getMaxSuspendedContinuations(), 2);
+ assertTrue("The acceptMessagesWhileStopping should be set to true",
+ jmsConfig.isAcceptMessagesWhileStopping());
assertNotNull("The connectionFactory should not be null", jmsConfig.getConnectionFactory());
assertTrue("Should get the instance of ActiveMQConnectionFactory",
jmsConfig.getConnectionFactory() instanceof ActiveMQConnectionFactory);
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=794779&r1=794778&r2=794779&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Thu Jul 16 19:02:36 2009
@@ -33,6 +33,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.jms.JmsException;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -99,9 +100,9 @@
}
@Test
- public void testThrottleWithMessageSelector() {
+ public void testThrottleWithJmsStartAndStop() {
- DefaultMessageListenerContainer springContainer = new DefaultMessageListenerContainer();
+ DefaultMessageListenerContainerStub springContainer = new DefaultMessageListenerContainerStub();
springContainer.setCacheLevel(2);
JMSConfiguration config = new JMSConfiguration();
config.setMaxSuspendedContinuations(1);
@@ -110,17 +111,17 @@
new TestJMSContinuationWrapper(b, m, observer, continuations,
springContainer, config);
- assertNull(springContainer.getMessageSelector());
- assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector());
+ assertFalse(springContainer.isStart());
+ assertFalse(springContainer.isStop());
- suspendResumeCheckSelector(cw, springContainer);
+ suspendResumeCheckStartAndStop(cw, config, springContainer);
EasyMock.reset(observer);
- suspendResumeCheckSelector(cw, springContainer);
+ suspendResumeCheckStartAndStop(cw, config, springContainer);
}
- private void suspendResumeCheckSelector(JMSContinuation cw,
- DefaultMessageListenerContainer springContainer) {
+ private void suspendResumeCheckStartAndStop(JMSContinuation cw, JMSConfiguration config,
+ DefaultMessageListenerContainerStub springContainer) {
try {
cw.suspend(5000);
fail("SuspendInvocation exception expected");
@@ -129,12 +130,10 @@
}
assertEquals(continuations.size(), 1);
assertSame(continuations.get(0), cw);
+ assertTrue(springContainer.isStop());
assertFalse(cw.suspend(1000));
- assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, springContainer.getMessageSelector());
- assertNull(cw.getCurrentMessageSelector());
-
observer.onMessage(m);
EasyMock.expectLastCall();
EasyMock.replay(observer);
@@ -142,10 +141,8 @@
cw.resume();
assertEquals(continuations.size(), 0);
+ assertTrue(springContainer.isStart());
EasyMock.verify(observer);
-
- assertNull(springContainer.getMessageSelector());
- assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector());
}
@Test
@@ -191,4 +188,27 @@
return result;
}
}
+
+ private class DefaultMessageListenerContainerStub extends DefaultMessageListenerContainer {
+ private boolean start;
+ private boolean stop;
+
+ public void start() throws JmsException {
+ this.start = true;
+ this.stop = false;
+ }
+
+ public void stop() throws JmsException {
+ this.stop = true;
+ this.start = false;
+ }
+
+ public boolean isStart() {
+ return this.start;
+ }
+
+ public boolean isStop() {
+ return this.stop;
+ }
+ }
}
Modified: cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml?rev=794779&r1=794778&r2=794779&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml (original)
+++ cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml Thu Jul 16 19:02:36 2009
@@ -109,6 +109,7 @@
p:concurrentConsumers="3"
p:maxConcurrentConsumers="5"
p:maxSuspendedContinuations="2"
+ p:acceptMessagesWhileStopping="true"
/>
<!--