You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2009/04/17 15:58:21 UTC
svn commit: r766013 - in /cxf/trunk:
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/
rt/transports/j...
Author: sergeyb
Date: Fri Apr 17 13:58:12 2009
New Revision: 766013
URL: http://svn.apache.org/viewvc?rev=766013&view=rev
Log:
CXF-2002 : support for maxSuspendedContinuations in JMS
Added:
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java (with props)
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl (with props)
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java
cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Apr 17 13:58:12 2009
@@ -64,6 +64,7 @@
private int concurrentConsumers = 1;
private int maxConcurrentConsumers = 1;
private int maxConcurrentTasks = 10;
+ private int maxSuspendedContinuations = DEFAULT_VALUE;
private volatile String messageSelector;
private boolean subscriptionDurable;
@@ -318,6 +319,14 @@
public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
this.maxConcurrentConsumers = maxConcurrentConsumers;
}
+
+ public int getMaxSuspendedContinuations() {
+ return maxSuspendedContinuations;
+ }
+
+ public void setMaxSuspendedContinuations(int maxSuspendedContinuations) {
+ this.maxSuspendedContinuations = maxSuspendedContinuations;
+ }
public TaskExecutor getTaskExecutor() {
return taskExecutor;
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Apr 17 13:58:12 2009
@@ -176,12 +176,15 @@
inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
inMessage.setDestination(this);
-
- inMessage.put(ContinuationProvider.class.getName(),
- new JMSContinuationProvider(bus,
- inMessage,
- incomingObserver,
- continuations));
+ if (jmsConfig.getMaxSuspendedContinuations() != 0) {
+ inMessage.put(ContinuationProvider.class.getName(),
+ new JMSContinuationProvider(bus,
+ inMessage,
+ incomingObserver,
+ continuations,
+ jmsListener,
+ jmsConfig));
+ }
BusFactory.setThreadDefaultBus(bus);
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Apr 17 13:58:12 2009
@@ -18,6 +18,8 @@
*/
package org.apache.cxf.transport.jms;
+import java.util.logging.Logger;
+
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -26,6 +28,7 @@
import javax.jms.Session;
import javax.naming.NamingException;
+import org.apache.cxf.common.logging.LogUtils;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsTemplate;
@@ -40,6 +43,8 @@
*/
public final class JMSFactory {
+ private static final Logger LOG = LogUtils.getL7dLogger(JMSFactory.class);
+
private JMSFactory() {
}
@@ -141,6 +146,12 @@
} else if (jmsConfig.getCacheLevel() != JMSConfiguration.DEFAULT_VALUE) {
jmsListener.setCacheLevel(jmsConfig.getCacheLevel());
}
+ if (jmsListener.getCacheLevel() >= DefaultMessageListenerContainer.CACHE_CONSUMER
+ && jmsConfig.getMaxSuspendedContinuations() > 0) {
+ LOG.info("maxSuspendedContinuations value will be ignored - "
+ + ", please set cacheLevel to the value less than "
+ + " org.springframework.jms.listener.DefaultMessageListenerContainer.CACHE_CONSUMER");
+ }
String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) {
jmsListener.setMessageSelector("JMSCorrelationID LIKE '"
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Fri Apr 17 13:58:12 2009
@@ -87,7 +87,9 @@
}
jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName());
jmsConfig.setExplicitQosEnabled(true);
- jmsConfig.setMessageSelector(serverBehavior.getMessageSelector());
+ if (jmsConfig.getMessageSelector() == null) {
+ jmsConfig.setMessageSelector(serverBehavior.getMessageSelector());
+ }
if (isConduit && runtimePolicy.isSetMessageType()) {
jmsConfig.setMessageType(runtimePolicy.getMessageType().value());
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Fri Apr 17 13:58:12 2009
@@ -22,20 +22,31 @@
import java.util.Collection;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.logging.Logger;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.SuspendedInvocationException;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class JMSContinuation implements Continuation {
+ static final String BOGUS_MESSAGE_SELECTOR = "org.apache.cxf.transports.jms.continuations=too-many";
+ private static final Logger LOG = LogUtils.getL7dLogger(JMSContinuation.class);
+
private Bus bus;
private Message inMessage;
private MessageObserver incomingObserver;
private Collection<JMSContinuation> continuations;
+ private DefaultMessageListenerContainer jmsListener;
+ private JMSConfiguration jmsConfig;
+
+ private String currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
private Object userObject;
@@ -44,14 +55,16 @@
private boolean isResumed;
private Timer timer = new Timer();
- public JMSContinuation(Bus b,
- Message m,
- MessageObserver observer,
- Collection<JMSContinuation> cList) {
+ public JMSContinuation(Bus b, Message m, MessageObserver observer,
+ Collection<JMSContinuation> cList,
+ DefaultMessageListenerContainer jmsListener,
+ JMSConfiguration jmsConfig) {
bus = b;
inMessage = m;
incomingObserver = observer;
continuations = cList;
+ this.jmsListener = jmsListener;
+ this.jmsConfig = jmsConfig;
}
public Object getObject() {
@@ -87,7 +100,8 @@
}
protected void doResume() {
- continuations.remove(this);
+
+ updateContinuations(true);
BusFactory.setThreadDefaultBus(bus);
try {
@@ -108,8 +122,8 @@
return false;
}
- continuations.add(this);
-
+ updateContinuations(false);
+
isNew = false;
isResumed = false;
isPending = true;
@@ -136,4 +150,47 @@
protected void cancelTimerTask() {
timer.cancel();
}
+
+ protected void updateContinuations(boolean remove) {
+
+ modifyList(remove);
+
+ if (jmsConfig.getMaxSuspendedContinuations() < 0
+ || jmsListener.getCacheLevel() >= DefaultMessageListenerContainer.CACHE_CONSUMER) {
+ return;
+ }
+
+ // throttle the flow if there're too many continuation instances in memory
+ if (remove && !BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
+ LOG.fine("A number of continuations has dropped below the limit of "
+ + jmsConfig.getMaxSuspendedContinuations()
+ + ", resetting JMS MessageSelector to " + currentMessageSelector);
+ jmsListener.setMessageSelector(currentMessageSelector);
+ currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
+ } else if (!remove && continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
+ currentMessageSelector = jmsListener.getMessageSelector();
+ if (!BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
+ LOG.fine("A number of continuations has reached the limit of "
+ + jmsConfig.getMaxSuspendedContinuations()
+ + ", setting JMS MessageSelector to " + BOGUS_MESSAGE_SELECTOR);
+ jmsListener.setMessageSelector(BOGUS_MESSAGE_SELECTOR);
+
+ }
+ }
+
+ }
+
+ protected void modifyList(boolean remove) {
+ if (remove) {
+ continuations.remove(this);
+ } else {
+ continuations.add(this);
+ }
+ }
+
+ String getCurrentMessageSelector() {
+ return currentMessageSelector;
+ }
+
+
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Fri Apr 17 13:58:12 2009
@@ -26,6 +26,8 @@
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class JMSContinuationProvider implements ContinuationProvider {
@@ -33,15 +35,21 @@
private Message inMessage;
private MessageObserver incomingObserver;
private Collection<JMSContinuation> continuations;
+ private DefaultMessageListenerContainer jmsListener;
+ private JMSConfiguration jmsConfig;
public JMSContinuationProvider(Bus b,
Message m,
MessageObserver observer,
- Collection<JMSContinuation> cList) {
+ Collection<JMSContinuation> cList,
+ DefaultMessageListenerContainer jmsListener,
+ JMSConfiguration jmsConfig) {
bus = b;
inMessage = m;
incomingObserver = observer;
continuations = cList;
+ this.jmsListener = jmsListener;
+ this.jmsConfig = jmsConfig;
}
public Continuation getContinuation() {
@@ -50,10 +58,8 @@
}
JMSContinuation cw = inMessage.get(JMSContinuation.class);
if (cw == null) {
- cw = new JMSContinuation(bus,
- inMessage,
- incomingObserver,
- continuations);
+ cw = new JMSContinuation(bus, inMessage, incomingObserver, continuations,
+ jmsListener, jmsConfig);
inMessage.put(JMSContinuation.class, cw);
}
return cw;
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Fri Apr 17 13:58:12 2009
@@ -127,6 +127,8 @@
jmsConfig = destination.getJmsConfig();*/
assertEquals("The concurrentConsumer should be set", jmsConfig.getConcurrentConsumers(), 3);
assertEquals("The maxConcurrentConsumer should be set", jmsConfig.getMaxConcurrentConsumers(), 5);
+ assertEquals("The maxSuspendedContinuations should be set",
+ jmsConfig.getMaxSuspendedContinuations(), 2);
assertNotNull("The connectionFactory should not be null", jmsConfig.getConnectionFactory());
assertTrue("Should get the instance of ActiveMQConnectionFactory",
jmsConfig.getConnectionFactory() instanceof ActiveMQConnectionFactory);
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java Fri Apr 17 13:58:12 2009
@@ -36,7 +36,8 @@
exchange.setOneWay(true);
Message m = new MessageImpl();
m.setExchange(exchange);
- JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null);
+ JMSContinuationProvider provider =
+ new JMSContinuationProvider(null, m, null, null, null, null);
assertNull(provider.getContinuation());
}
@@ -44,7 +45,8 @@
public void testGetNewContinuation() {
Message m = new MessageImpl();
m.setExchange(new ExchangeImpl());
- JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null);
+ JMSContinuationProvider provider =
+ new JMSContinuationProvider(null, m, null, null, null, null);
Continuation cw = provider.getContinuation();
assertTrue(cw.isNew());
assertSame(cw, m.get(JMSContinuation.class));
@@ -54,9 +56,9 @@
public void testGetExistingContinuation() {
Message m = new MessageImpl();
m.setExchange(new ExchangeImpl());
- JMSContinuation cw = new JMSContinuation(null, m, null, null);
+ JMSContinuation cw = new JMSContinuation(null, m, null, null, null, null);
m.put(JMSContinuation.class, cw);
- JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null);
+ JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null, null, null);
assertSame(cw, provider.getContinuation());
assertSame(cw, m.get(JMSContinuation.class));
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Fri Apr 17 13:58:12 2009
@@ -28,10 +28,13 @@
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.JMSConfiguration;
import org.easymock.classextension.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
public class JMSContinuationTest extends Assert {
@@ -52,7 +55,7 @@
@Test
public void testInitialStatus() {
JMSContinuation cw =
- new JMSContinuation(b, m, observer, continuations);
+ new JMSContinuation(b, m, observer, continuations, null, null);
assertTrue(cw.isNew());
assertFalse(cw.isPending());
assertFalse(cw.isResumed());
@@ -61,7 +64,7 @@
@Test
public void testSuspendResume() {
TestJMSContinuationWrapper cw =
- new TestJMSContinuationWrapper(b, m, observer, continuations);
+ new TestJMSContinuationWrapper(b, m, observer, continuations, null, new JMSConfiguration());
try {
cw.suspend(5000);
fail("SuspendInvocation exception expected");
@@ -96,9 +99,58 @@
}
@Test
+ public void testThrottleWithMessageSelector() {
+
+ DefaultMessageListenerContainer springContainer = new DefaultMessageListenerContainer();
+ springContainer.setCacheLevel(2);
+ JMSConfiguration config = new JMSConfiguration();
+ config.setMaxSuspendedContinuations(1);
+
+ TestJMSContinuationWrapper cw =
+ new TestJMSContinuationWrapper(b, m, observer, continuations,
+ springContainer, config);
+
+ assertNull(springContainer.getMessageSelector());
+ assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector());
+
+ suspendResumeCheckSelector(cw, springContainer);
+ EasyMock.reset(observer);
+ suspendResumeCheckSelector(cw, springContainer);
+
+ }
+
+ private void suspendResumeCheckSelector(JMSContinuation cw,
+ DefaultMessageListenerContainer springContainer) {
+ try {
+ cw.suspend(5000);
+ fail("SuspendInvocation exception expected");
+ } catch (SuspendedInvocationException ex) {
+ // ignore
+ }
+ assertEquals(continuations.size(), 1);
+ assertSame(continuations.get(0), cw);
+
+ assertFalse(cw.suspend(1000));
+
+ assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, springContainer.getMessageSelector());
+ assertNull(cw.getCurrentMessageSelector());
+
+ observer.onMessage(m);
+ EasyMock.expectLastCall();
+ EasyMock.replay(observer);
+
+ cw.resume();
+
+ assertEquals(continuations.size(), 0);
+ EasyMock.verify(observer);
+
+ assertNull(springContainer.getMessageSelector());
+ assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector());
+ }
+
+ @Test
public void testUserObject() {
- JMSContinuation cw =
- new JMSContinuation(b, m, observer, continuations);
+ JMSContinuation cw = new JMSContinuation(b, m, observer, continuations, null, null);
assertNull(cw.getObject());
Object userObject = new Object();
cw.setObject(userObject);
@@ -113,8 +165,10 @@
public TestJMSContinuationWrapper(Bus b,
Message m,
MessageObserver observer,
- List<JMSContinuation> cList) {
- super(b, m, observer, cList);
+ List<JMSContinuation> cList,
+ DefaultMessageListenerContainer jmsListener,
+ JMSConfiguration jmsConfig) {
+ super(b, m, observer, cList, jmsListener, jmsConfig);
}
public void createTimerTask(long timeout) {
Modified: cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml (original)
+++ cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml Fri Apr 17 13:58:12 2009
@@ -107,6 +107,7 @@
p:usingEndpointInfo="false"
p:concurrentConsumers="3"
p:maxConcurrentConsumers="5"
+ p:maxSuspendedContinuations="2"
/>
<!--
Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java Fri Apr 17 13:58:12 2009
@@ -61,7 +61,7 @@
launchServer(EmbeddedJMSBrokerLauncher.class, props, null));
assertTrue("server did not launch correctly",
- launchServer(Server2.class, false));
+ launchServer(Server2.class));
serversStarted = true;
}
Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java Fri Apr 17 13:58:12 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.systest.http_jetty.continuations.HelloContinuation;
+import org.apache.cxf.systest.http_jetty.continuations.HelloContinuationService;
+import org.apache.cxf.systest.http_jetty.continuations.HelloWorker;
+import org.apache.cxf.systest.jms.EmbeddedJMSBrokerLauncher;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class HelloWorldContinuationsThrottleTest extends AbstractBusClientServerTestBase {
+
+ private static boolean serversStarted;
+ private static final String CONFIG_FILE =
+ "org/apache/cxf/systest/jms/continuations/jms_test_config.xml";
+
+ @Before
+ public void startServers() throws Exception {
+ if (serversStarted) {
+ return;
+ }
+ Map<String, String> props = new HashMap<String, String>();
+ if (System.getProperty("activemq.store.dir") != null) {
+ props.put("activemq.store.dir", System.getProperty("activemq.store.dir"));
+ }
+ props.put("java.util.logging.config.file",
+ System.getProperty("java.util.logging.config.file"));
+
+ assertTrue("server did not launch correctly",
+ launchServer(EmbeddedJMSBrokerLauncher.class, props, null));
+
+ assertTrue("server did not launch correctly",
+ launchServer(Server3.class));
+ serversStarted = true;
+ }
+
+ @Test
+ public void testHttpWrappedContinuatuions() throws Exception {
+ SpringBusFactory bf = new SpringBusFactory();
+ Bus bus = bf.createBus(CONFIG_FILE);
+ BusFactory.setDefaultBus(bus);
+
+ QName serviceName = new QName("http://cxf.apache.org/systest/jaxws", "HelloContinuationService");
+
+ URL wsdlURL = getClass().getResource("/org/apache/cxf/systest/jms/continuations/test2.wsdl");
+
+ HelloContinuationService service = new HelloContinuationService(wsdlURL, serviceName);
+ assertNotNull(service);
+ final HelloContinuation helloPort = service.getHelloContinuationPort();
+
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(10));
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch helloDoneSignal = new CountDownLatch(5);
+
+ executor.execute(new HelloWorker(helloPort, "Fred", "", startSignal, helloDoneSignal));
+ startSignal.countDown();
+
+ Thread.sleep(10000);
+
+ executor.execute(new HelloWorker(helloPort, "Barry", "Jameson", startSignal, helloDoneSignal));
+ executor.execute(new HelloWorker(helloPort, "Harry", "", startSignal, helloDoneSignal));
+ executor.execute(new HelloWorker(helloPort, "Rob", "Davidson", startSignal, helloDoneSignal));
+ executor.execute(new HelloWorker(helloPort, "James", "ServiceMix", startSignal, helloDoneSignal));
+
+
+ helloDoneSignal.await(60, TimeUnit.SECONDS);
+ executor.shutdownNow();
+ System.out.println("Completed : " + (5 - helloDoneSignal.getCount()));
+ assertEquals("Not all invocations have completed", 0, helloDoneSignal.getCount());
+ }
+
+}
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java Fri Apr 17 13:58:12 2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Resource;
+import javax.jws.WebService;
+import javax.xml.ws.WebServiceContext;
+
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.systest.http_jetty.continuations.HelloContinuation;
+
+
+
+@WebService(name = "HelloContinuation",
+ serviceName = "HelloContinuationService",
+ portName = "HelloContinuationPort",
+ targetNamespace = "http://cxf.apache.org/systest/jaxws",
+ endpointInterface = "org.apache.cxf.systest.http_jetty.continuations.HelloContinuation",
+ wsdlLocation = "org/apache/cxf/systest/jms/continuations/test2.wsdl")
+public class HelloWorldWithContinuationsJMS2 implements HelloContinuation {
+
+ private Map<String, Continuation> suspended =
+ new HashMap<String, Continuation>();
+ private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(10));
+
+ @Resource
+ private WebServiceContext context;
+
+ public String sayHi(String firstName, String secondName) {
+
+ Continuation continuation = getContinuation(firstName);
+ if (continuation == null) {
+ throw new RuntimeException("Failed to get continuation");
+ }
+ synchronized (continuation) {
+ if (continuation.isNew()) {
+ Object userObject = secondName != null && secondName.length() > 0
+ ? secondName : null;
+ continuation.setObject(userObject);
+ suspendInvocation(firstName, continuation);
+ } else {
+ if (!continuation.isResumed() && !"Fred".equals(firstName)) {
+ throw new RuntimeException("No timeout expected");
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append(firstName);
+
+ // if the actual parameter is not null
+ if (secondName != null && secondName.length() > 0) {
+ String surname = continuation.getObject().toString();
+ sb.append(' ').append(surname);
+ }
+ System.out.println("Saying hi to " + sb.toString());
+ return "Hi " + sb.toString();
+ }
+ }
+ // unreachable
+ return null;
+ }
+
+ public boolean isRequestSuspended(String name) {
+ synchronized (suspended) {
+ while (!suspended.containsKey(name)) {
+ try {
+ suspended.wait(1000);
+ } catch (InterruptedException ex) {
+ return false;
+ }
+ }
+ }
+ System.out.println("Invocation for " + name + " has been suspended");
+
+ return true;
+ }
+
+ public void resumeRequest(final String name) {
+
+ Continuation suspendedCont = null;
+ synchronized (suspended) {
+ suspendedCont = suspended.get(name);
+ }
+
+ if (suspendedCont != null) {
+ synchronized (suspendedCont) {
+ suspendedCont.resume();
+ }
+ }
+ }
+
+ private void suspendInvocation(final String name, Continuation cont) {
+
+ System.out.println("Suspending invocation for " + name);
+
+ try {
+ long timeout = "Fred".equals(name) ? 8000 : 4000;
+ cont.suspend(timeout);
+ } finally {
+ synchronized (suspended) {
+ suspended.put(name, cont);
+ }
+ if (!"Fred".equals(name)) {
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
+ resumeRequest(name);
+ }
+ });
+ }
+ }
+ }
+
+ private Continuation getContinuation(String name) {
+
+ System.out.println("Getting continuation for " + name);
+
+ synchronized (suspended) {
+ Continuation suspendedCont = suspended.remove(name);
+ if (suspendedCont != null) {
+ return suspendedCont;
+ }
+ }
+
+ ContinuationProvider provider =
+ (ContinuationProvider)context.getMessageContext().get(ContinuationProvider.class.getName());
+ return provider.getContinuation();
+ }
+
+}
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java Fri Apr 17 13:58:12 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+public class Server3 extends AbstractBusTestServerBase {
+
+ private static final String CONFIG_FILE =
+ "org/apache/cxf/systest/jms/continuations/jms_test_config.xml";
+
+ protected void run() {
+ SpringBusFactory bf = new SpringBusFactory();
+ Bus bus = bf.createBus(CONFIG_FILE);
+ BusFactory.setDefaultBus(bus);
+ Object implementor = new HelloWorldWithContinuationsJMS2();
+ String address = "http://localhost:9000/SoapContext/SoapPort";
+ Endpoint.publish(address, implementor);
+ }
+
+
+ public static void main(String[] args) {
+ try {
+ Server3 s = new Server3();
+ s.start();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ System.exit(-1);
+ } finally {
+ System.out.println("done!");
+ }
+ }
+}
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml Fri Apr 17 13:58:12 2009
@@ -31,6 +31,11 @@
<jms:conduit name="{http://cxf.apache.org/systest/jaxws}HelloContinuationPort.jms-conduit">
<jms:clientConfig clientReceiveTimeout="500000" messageTimeToLive="500000"/>
+ <jms:address jndiConnectionFactoryName="ConnectionFactory"
+ jndiDestinationName="dynamicQueues/test.jmstransport.text">
+ <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
+ <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
+ </jms:address>
<jms:jmsConfig-ref>jmsConf1</jms:jmsConfig-ref>
</jms:conduit>
@@ -44,7 +49,25 @@
<bean id="jmsConf1" class="org.apache.cxf.transport.jms.JMSConfiguration"
p:connectionFactory-ref="singleConnectionFactory"
- p:concurrentConsumers="10"
- p:maxConcurrentConsumers="10"/>
+ p:concurrentConsumers="1"
+ p:maxConcurrentConsumers="1"/>
+
+ <jms:destination name="{http://cxf.apache.org/systest/jaxws}HelloContinuationPort.jms-destination">
+ <jms:address jndiConnectionFactoryName="ConnectionFactory"
+ jndiDestinationName="dynamicQueues/test.jmstransport.text">
+ <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
+ <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
+ </jms:address>
+ <jms:jmsConfig-ref>jmsConf2</jms:jmsConfig-ref>
+ </jms:destination>
+
+ <bean id="jmsConf2" class="org.apache.cxf.transport.jms.JMSConfiguration"
+ p:connectionFactory-ref="jmsConnectionFactory"
+ p:timeToLive="500000"
+ p:concurrentConsumers="1"
+ p:maxConcurrentConsumers="1"
+ p:maxSuspendedContinuations="1"
+ p:cacheLevel="2"
+ />
</beans>
Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl Fri Apr 17 13:58:12 2009
@@ -0,0 +1,76 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<definitions name="HelloContinuationService"
+ targetNamespace="http://cxf.apache.org/systest/jaxws"
+ xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
+ xmlns:tns="http://cxf.apache.org/systest/jaxws"
+ xmlns="http://schemas.xmlsoap.org/wsdl/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:jms="http://cxf.apache.org/transports/jms">
+
+ <message name="isRequestSuspended">
+ <part name="arg0" type="xsd:string"/>
+ </message>
+ <message name="sayHiResponse">
+ <part name="return" type="xsd:string"/>
+ </message>
+ <message name="sayHi">
+ <part name="arg0" type="xsd:string"/>
+ <part name="arg1" type="xsd:string"/>
+ </message>
+ <message name="resumeRequestResponse"/>
+
+ <message name="isRequestSuspendedResponse">
+ <part name="return" type="xsd:boolean"/>
+ </message>
+
+ <message name="resumeRequest">
+ <part name="arg0" type="xsd:string"/>
+ </message>
+ <portType name="HelloContinuation">
+ <operation name="isRequestSuspended">
+ <input message="tns:isRequestSuspended" name="isRequestSuspended"/>
+ <output message="tns:isRequestSuspendedResponse" name="isRequestSuspendedResponse"/>
+ </operation>
+ <operation name="resumeRequest">
+ <input message="tns:resumeRequest" name="resumeRequest"/>
+ <output message="tns:resumeRequestResponse" name="resumeRequestResponse"/>
+ </operation>
+ <operation name="sayHi">
+ <input message="tns:sayHi" name="sayHi"/>
+ <output message="tns:sayHiResponse" name="sayHiResponse"/>
+ </operation>
+ </portType>
+ <binding name="HelloContinuationServiceSoapBinding" type="tns:HelloContinuation">
+ <soap:binding style="rpc" transport="http://cxf.apache.org/transports/jms" />
+ <operation name="isRequestSuspended">
+ <soap:operation soapAction="" style="rpc" />
+ <input name="isRequestSuspended">
+ <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+ </input>
+ <output name="isRequestSuspendedResponse">
+ <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+ </output>
+ </operation>
+ <operation name="resumeRequest">
+ <soap:operation soapAction="" style="rpc" />
+ <input name="resumeRequest">
+ <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+ </input>
+ <output name="resumeRequestResponse">
+ <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+ </output>
+ </operation>
+ <operation name="sayHi">
+ <soap:operation soapAction="" style="rpc" />
+ <input name="sayHi">
+ <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+ </input>
+ <output name="sayHiResponse">
+ <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+ </output>
+ </operation>
+ </binding>
+ <service name="HelloContinuationService">
+ <port binding="tns:HelloContinuationServiceSoapBinding" name="HelloContinuationPort"/>
+ </service>
+</definitions>
\ No newline at end of file
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
------------------------------------------------------------------------------
svn:mime-type = text/xml