You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by se...@apache.org on 2009/04/17 15:58:21 UTC

svn commit: r766013 - in /cxf/trunk: rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ rt/transports/j...

Author: sergeyb
Date: Fri Apr 17 13:58:12 2009
New Revision: 766013

URL: http://svn.apache.org/viewvc?rev=766013&view=rev
Log:
CXF-2002 : support for maxSuspendedContinuations in JMS

Added:
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java   (with props)
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl   (with props)
Modified:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
    cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java
    cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Apr 17 13:58:12 2009
@@ -64,6 +64,7 @@
     private int concurrentConsumers = 1;
     private int maxConcurrentConsumers = 1;
     private int maxConcurrentTasks = 10;
+    private int maxSuspendedContinuations = DEFAULT_VALUE;
 
     private volatile String messageSelector;
     private boolean subscriptionDurable;
@@ -318,6 +319,14 @@
     public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
         this.maxConcurrentConsumers = maxConcurrentConsumers;
     }
+    
+    public int getMaxSuspendedContinuations() {
+        return maxSuspendedContinuations;
+    }
+
+    public void setMaxSuspendedContinuations(int maxSuspendedContinuations) {
+        this.maxSuspendedContinuations = maxSuspendedContinuations;
+    }
 
     public TaskExecutor getTaskExecutor() {
         return taskExecutor;

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Apr 17 13:58:12 2009
@@ -176,12 +176,15 @@
             inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
             inMessage.setDestination(this);
-
-            inMessage.put(ContinuationProvider.class.getName(), 
-                          new JMSContinuationProvider(bus,
-                                                      inMessage,
-                                                      incomingObserver,
-                                                      continuations));
+            if (jmsConfig.getMaxSuspendedContinuations() != 0) {
+                inMessage.put(ContinuationProvider.class.getName(), 
+                              new JMSContinuationProvider(bus,
+                                                          inMessage,
+                                                          incomingObserver,
+                                                          continuations,
+                                                          jmsListener,
+                                                          jmsConfig));
+            }
             
             BusFactory.setThreadDefaultBus(bus);
 

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Apr 17 13:58:12 2009
@@ -18,6 +18,8 @@
  */
 package org.apache.cxf.transport.jms;
 
+import java.util.logging.Logger;
+
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -26,6 +28,7 @@
 import javax.jms.Session;
 import javax.naming.NamingException;
 
+import org.apache.cxf.common.logging.LogUtils;
 import org.springframework.core.task.SimpleAsyncTaskExecutor;
 import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
 import org.springframework.jms.core.JmsTemplate;
@@ -40,6 +43,8 @@
  */
 public final class JMSFactory {
 
+    private static final Logger LOG = LogUtils.getL7dLogger(JMSFactory.class);
+    
     private JMSFactory() {
     }
 
@@ -141,6 +146,12 @@
         } else if (jmsConfig.getCacheLevel() != JMSConfiguration.DEFAULT_VALUE) {
             jmsListener.setCacheLevel(jmsConfig.getCacheLevel());
         }
+        if (jmsListener.getCacheLevel() >= DefaultMessageListenerContainer.CACHE_CONSUMER
+            && jmsConfig.getMaxSuspendedContinuations() > 0) {
+            LOG.info("maxSuspendedContinuations value will be ignored - "
+                     + ", please set cacheLevel to the value less than "
+                     + " org.springframework.jms.listener.DefaultMessageListenerContainer.CACHE_CONSUMER");
+        }
         String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
         if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) {
             jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Fri Apr 17 13:58:12 2009
@@ -87,7 +87,9 @@
             }
             jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName());
             jmsConfig.setExplicitQosEnabled(true);
-            jmsConfig.setMessageSelector(serverBehavior.getMessageSelector());        
+            if (jmsConfig.getMessageSelector() == null) {
+                jmsConfig.setMessageSelector(serverBehavior.getMessageSelector());
+            }
             if (isConduit && runtimePolicy.isSetMessageType()) {
                 jmsConfig.setMessageType(runtimePolicy.getMessageType().value());
             }        

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Fri Apr 17 13:58:12 2009
@@ -22,20 +22,31 @@
 import java.util.Collection;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.logging.Logger;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
 public class JMSContinuation implements Continuation {
 
+    static final String BOGUS_MESSAGE_SELECTOR = "org.apache.cxf.transports.jms.continuations=too-many";
+    private static final Logger LOG = LogUtils.getL7dLogger(JMSContinuation.class);
+    
     private Bus bus;
     private Message inMessage;
     private MessageObserver incomingObserver;
     private Collection<JMSContinuation> continuations;
+    private DefaultMessageListenerContainer jmsListener;
+    private JMSConfiguration jmsConfig;
+    
+    private String currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
     
     private Object userObject;
     
@@ -44,14 +55,16 @@
     private boolean isResumed;
     private Timer timer = new Timer();
     
-    public JMSContinuation(Bus b,
-                                  Message m, 
-                                  MessageObserver observer,
-                                  Collection<JMSContinuation> cList) {
+    public JMSContinuation(Bus b, Message m, MessageObserver observer,
+                           Collection<JMSContinuation> cList, 
+                           DefaultMessageListenerContainer jmsListener,
+                           JMSConfiguration jmsConfig) {
         bus = b;
         inMessage = m;    
         incomingObserver = observer;
         continuations = cList;
+        this.jmsListener = jmsListener;
+        this.jmsConfig = jmsConfig;
     }    
     
     public Object getObject() {
@@ -87,7 +100,8 @@
     }
     
     protected void doResume() {
-        continuations.remove(this);
+        
+        updateContinuations(true);
         
         BusFactory.setThreadDefaultBus(bus);
         try {
@@ -108,8 +122,8 @@
             return false;
         }
         
-        continuations.add(this);
-        
+        updateContinuations(false);
+                
         isNew = false;
         isResumed = false;
         isPending = true;
@@ -136,4 +150,47 @@
     protected void cancelTimerTask() {
         timer.cancel();
     }
+    
+    protected void updateContinuations(boolean remove) {
+
+        modifyList(remove);
+        
+        if (jmsConfig.getMaxSuspendedContinuations() < 0
+            || jmsListener.getCacheLevel() >= DefaultMessageListenerContainer.CACHE_CONSUMER) {
+            return;
+        }
+        
+        // throttle the flow if there're too many continuation instances in memory
+        if (remove && !BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
+            LOG.fine("A number of continuations has dropped below the limit of "
+                     + jmsConfig.getMaxSuspendedContinuations()
+                     + ", resetting JMS MessageSelector to " + currentMessageSelector);
+            jmsListener.setMessageSelector(currentMessageSelector);
+            currentMessageSelector = BOGUS_MESSAGE_SELECTOR;
+        } else if (!remove && continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
+            currentMessageSelector = jmsListener.getMessageSelector();
+            if (!BOGUS_MESSAGE_SELECTOR.equals(currentMessageSelector)) {
+                LOG.fine("A number of continuations has reached the limit of "
+                         + jmsConfig.getMaxSuspendedContinuations()
+                         + ", setting JMS MessageSelector to " + BOGUS_MESSAGE_SELECTOR);
+                jmsListener.setMessageSelector(BOGUS_MESSAGE_SELECTOR);
+                
+            }
+        }
+
+    }
+    
+    protected void modifyList(boolean remove) {
+        if (remove) {
+            continuations.remove(this);
+        } else {
+            continuations.add(this);
+        }
+    }
+    
+    String getCurrentMessageSelector() {
+        return currentMessageSelector;
+    }
+    
+    
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Fri Apr 17 13:58:12 2009
@@ -26,6 +26,8 @@
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.JMSConfiguration;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
 public class JMSContinuationProvider implements ContinuationProvider {
 
@@ -33,15 +35,21 @@
     private Message inMessage;
     private MessageObserver incomingObserver;
     private Collection<JMSContinuation> continuations;
+    private DefaultMessageListenerContainer jmsListener;
+    private JMSConfiguration jmsConfig;
     
     public JMSContinuationProvider(Bus b,
                                    Message m, 
                                    MessageObserver observer,
-                                   Collection<JMSContinuation> cList) {
+                                   Collection<JMSContinuation> cList,
+                                   DefaultMessageListenerContainer jmsListener,
+                                   JMSConfiguration jmsConfig) {
         bus = b;
         inMessage = m;    
         incomingObserver = observer;
         continuations = cList;
+        this.jmsListener = jmsListener;
+        this.jmsConfig = jmsConfig;
     }
     
     public Continuation getContinuation() {
@@ -50,10 +58,8 @@
         }
         JMSContinuation cw = inMessage.get(JMSContinuation.class);
         if (cw == null) {
-            cw = new JMSContinuation(bus,
-                                           inMessage, 
-                                           incomingObserver,
-                                           continuations);
+            cw = new JMSContinuation(bus, inMessage,  incomingObserver, continuations, 
+                                     jmsListener, jmsConfig);
             inMessage.put(JMSContinuation.class, cw);
         }
         return cw;

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Fri Apr 17 13:58:12 2009
@@ -127,6 +127,8 @@
         jmsConfig = destination.getJmsConfig();*/
         assertEquals("The concurrentConsumer should be set", jmsConfig.getConcurrentConsumers(), 3);
         assertEquals("The maxConcurrentConsumer should be set", jmsConfig.getMaxConcurrentConsumers(), 5);
+        assertEquals("The maxSuspendedContinuations should be set", 
+                     jmsConfig.getMaxSuspendedContinuations(), 2);
         assertNotNull("The connectionFactory should not be null", jmsConfig.getConnectionFactory());
         assertTrue("Should get the instance of ActiveMQConnectionFactory", 
                    jmsConfig.getConnectionFactory() instanceof ActiveMQConnectionFactory);

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java Fri Apr 17 13:58:12 2009
@@ -36,7 +36,8 @@
         exchange.setOneWay(true);
         Message m = new MessageImpl();
         m.setExchange(exchange);
-        JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null);
+        JMSContinuationProvider provider = 
+            new JMSContinuationProvider(null, m, null, null, null, null);
         assertNull(provider.getContinuation());
     }
     
@@ -44,7 +45,8 @@
     public void testGetNewContinuation() {
         Message m = new MessageImpl();
         m.setExchange(new ExchangeImpl());
-        JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null);
+        JMSContinuationProvider provider = 
+            new JMSContinuationProvider(null, m, null, null, null, null);
         Continuation cw = provider.getContinuation(); 
         assertTrue(cw.isNew());
         assertSame(cw, m.get(JMSContinuation.class));
@@ -54,9 +56,9 @@
     public void testGetExistingContinuation() {
         Message m = new MessageImpl();
         m.setExchange(new ExchangeImpl());
-        JMSContinuation cw = new JMSContinuation(null, m, null, null);
+        JMSContinuation cw = new JMSContinuation(null, m, null, null, null, null);
         m.put(JMSContinuation.class, cw);
-        JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null);
+        JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null, null, null);
         assertSame(cw, provider.getContinuation());
         assertSame(cw, m.get(JMSContinuation.class));
     }

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Fri Apr 17 13:58:12 2009
@@ -28,10 +28,13 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.transport.jms.JMSConfiguration;
 import org.easymock.classextension.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
 
 
 public class JMSContinuationTest extends Assert {
@@ -52,7 +55,7 @@
     @Test
     public void testInitialStatus() {
         JMSContinuation cw = 
-            new JMSContinuation(b, m, observer, continuations);
+            new JMSContinuation(b, m, observer, continuations, null, null);
         assertTrue(cw.isNew());
         assertFalse(cw.isPending());
         assertFalse(cw.isResumed());
@@ -61,7 +64,7 @@
     @Test
     public void testSuspendResume() {
         TestJMSContinuationWrapper cw = 
-            new TestJMSContinuationWrapper(b, m, observer, continuations);
+            new TestJMSContinuationWrapper(b, m, observer, continuations, null, new JMSConfiguration());
         try {
             cw.suspend(5000);
             fail("SuspendInvocation exception expected");
@@ -96,9 +99,58 @@
     }
     
     @Test
+    public void testThrottleWithMessageSelector() {
+        
+        DefaultMessageListenerContainer springContainer = new DefaultMessageListenerContainer();
+        springContainer.setCacheLevel(2);
+        JMSConfiguration config = new JMSConfiguration();
+        config.setMaxSuspendedContinuations(1);
+        
+        TestJMSContinuationWrapper cw = 
+            new TestJMSContinuationWrapper(b, m, observer, continuations,
+                                           springContainer, config);
+        
+        assertNull(springContainer.getMessageSelector());
+        assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector());
+        
+        suspendResumeCheckSelector(cw, springContainer);
+        EasyMock.reset(observer);
+        suspendResumeCheckSelector(cw, springContainer);
+        
+    }
+    
+    private void suspendResumeCheckSelector(JMSContinuation cw, 
+                                            DefaultMessageListenerContainer springContainer) {
+        try {
+            cw.suspend(5000);
+            fail("SuspendInvocation exception expected");
+        } catch (SuspendedInvocationException ex) {
+            // ignore
+        }
+        assertEquals(continuations.size(), 1);
+        assertSame(continuations.get(0), cw);
+        
+        assertFalse(cw.suspend(1000));
+        
+        assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, springContainer.getMessageSelector());
+        assertNull(cw.getCurrentMessageSelector());        
+        
+        observer.onMessage(m);
+        EasyMock.expectLastCall();
+        EasyMock.replay(observer);
+        
+        cw.resume();
+        
+        assertEquals(continuations.size(), 0);
+        EasyMock.verify(observer);
+        
+        assertNull(springContainer.getMessageSelector());
+        assertEquals(JMSContinuation.BOGUS_MESSAGE_SELECTOR, cw.getCurrentMessageSelector());
+    }
+    
+    @Test
     public void testUserObject() {
-        JMSContinuation cw = 
-            new JMSContinuation(b, m, observer, continuations);
+        JMSContinuation cw = new JMSContinuation(b, m, observer, continuations, null, null);
         assertNull(cw.getObject());
         Object userObject = new Object();
         cw.setObject(userObject);
@@ -113,8 +165,10 @@
         public TestJMSContinuationWrapper(Bus b,
                                           Message m, 
                                           MessageObserver observer,
-                                          List<JMSContinuation> cList) {
-            super(b, m, observer, cList);
+                                          List<JMSContinuation> cList,
+                                          DefaultMessageListenerContainer jmsListener,
+                                          JMSConfiguration jmsConfig) {
+            super(b, m, observer, cList, jmsListener, jmsConfig);
         }
         
         public void createTimerTask(long timeout) {

Modified: cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml (original)
+++ cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml Fri Apr 17 13:58:12 2009
@@ -107,6 +107,7 @@
   	p:usingEndpointInfo="false"
   	p:concurrentConsumers="3"
   	p:maxConcurrentConsumers="5"
+  	p:maxSuspendedContinuations="2"
   />    
  
 <!--

Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsClientServerTest.java Fri Apr 17 13:58:12 2009
@@ -61,7 +61,7 @@
                    launchServer(EmbeddedJMSBrokerLauncher.class, props, null));
 
         assertTrue("server did not launch correctly", 
-                   launchServer(Server2.class, false));
+                   launchServer(Server2.class));
         serversStarted = true;
     }
     

Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java Fri Apr 17 13:58:12 2009
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.namespace.QName;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.systest.http_jetty.continuations.HelloContinuation;
+import org.apache.cxf.systest.http_jetty.continuations.HelloContinuationService;
+import org.apache.cxf.systest.http_jetty.continuations.HelloWorker;
+import org.apache.cxf.systest.jms.EmbeddedJMSBrokerLauncher;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class HelloWorldContinuationsThrottleTest extends AbstractBusClientServerTestBase {
+    
+    private static boolean serversStarted;
+    private static final String CONFIG_FILE =
+        "org/apache/cxf/systest/jms/continuations/jms_test_config.xml";
+
+    @Before
+    public void startServers() throws Exception {
+        if (serversStarted) {
+            return;
+        }
+        Map<String, String> props = new HashMap<String, String>();                
+        if (System.getProperty("activemq.store.dir") != null) {
+            props.put("activemq.store.dir", System.getProperty("activemq.store.dir"));
+        }
+        props.put("java.util.logging.config.file", 
+                  System.getProperty("java.util.logging.config.file"));
+        
+        assertTrue("server did not launch correctly", 
+                   launchServer(EmbeddedJMSBrokerLauncher.class, props, null));
+
+        assertTrue("server did not launch correctly", 
+                   launchServer(Server3.class));
+        serversStarted = true;
+    }
+    
+    @Test
+    public void testHttpWrappedContinuatuions() throws Exception {
+        SpringBusFactory bf = new SpringBusFactory();
+        Bus bus = bf.createBus(CONFIG_FILE);
+        BusFactory.setDefaultBus(bus);
+        
+        QName serviceName = new QName("http://cxf.apache.org/systest/jaxws", "HelloContinuationService");
+        
+        URL wsdlURL = getClass().getResource("/org/apache/cxf/systest/jms/continuations/test2.wsdl");
+        
+        HelloContinuationService service = new HelloContinuationService(wsdlURL, serviceName);
+        assertNotNull(service);
+        final HelloContinuation helloPort = service.getHelloContinuationPort();
+        
+        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
+                                                             new ArrayBlockingQueue<Runnable>(10));
+        CountDownLatch startSignal = new CountDownLatch(1);
+        CountDownLatch helloDoneSignal = new CountDownLatch(5);
+        
+        executor.execute(new HelloWorker(helloPort, "Fred", "", startSignal, helloDoneSignal));
+        startSignal.countDown();
+        
+        Thread.sleep(10000);
+                
+        executor.execute(new HelloWorker(helloPort, "Barry", "Jameson", startSignal, helloDoneSignal));
+        executor.execute(new HelloWorker(helloPort, "Harry", "", startSignal, helloDoneSignal));
+        executor.execute(new HelloWorker(helloPort, "Rob", "Davidson", startSignal, helloDoneSignal));
+        executor.execute(new HelloWorker(helloPort, "James", "ServiceMix", startSignal, helloDoneSignal));
+        
+                
+        helloDoneSignal.await(60, TimeUnit.SECONDS);
+        executor.shutdownNow();
+        System.out.println("Completed : " + (5 - helloDoneSignal.getCount()));
+        assertEquals("Not all invocations have completed", 0, helloDoneSignal.getCount());
+    }
+        
+}

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldContinuationsThrottleTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java Fri Apr 17 13:58:12 2009
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Resource;
+import javax.jws.WebService;
+import javax.xml.ws.WebServiceContext;
+
+import org.apache.cxf.continuations.Continuation;
+import org.apache.cxf.continuations.ContinuationProvider;
+import org.apache.cxf.systest.http_jetty.continuations.HelloContinuation;
+
+
+
+@WebService(name = "HelloContinuation", 
+            serviceName = "HelloContinuationService", 
+            portName = "HelloContinuationPort", 
+            targetNamespace = "http://cxf.apache.org/systest/jaxws",
+            endpointInterface = "org.apache.cxf.systest.http_jetty.continuations.HelloContinuation",
+            wsdlLocation = "org/apache/cxf/systest/jms/continuations/test2.wsdl")
+public class HelloWorldWithContinuationsJMS2 implements HelloContinuation {    
+    
+    private Map<String, Continuation> suspended = 
+        new HashMap<String, Continuation>();
+    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
+                                        new ArrayBlockingQueue<Runnable>(10));
+    
+    @Resource
+    private WebServiceContext context;
+    
+    public String sayHi(String firstName, String secondName) {
+        
+        Continuation continuation = getContinuation(firstName);
+        if (continuation == null) {
+            throw new RuntimeException("Failed to get continuation");
+        }
+        synchronized (continuation) {
+            if (continuation.isNew()) {
+                Object userObject = secondName != null && secondName.length() > 0 
+                                    ? secondName : null;
+                continuation.setObject(userObject);
+                suspendInvocation(firstName, continuation);
+            } else {
+                if (!continuation.isResumed() && !"Fred".equals(firstName)) {
+                    throw new RuntimeException("No timeout expected");
+                }
+                StringBuilder sb = new StringBuilder();
+                sb.append(firstName);
+                
+                // if the actual parameter is not null 
+                if (secondName != null && secondName.length() > 0) {
+                    String surname = continuation.getObject().toString();
+                    sb.append(' ').append(surname);
+                }
+                System.out.println("Saying hi to " + sb.toString());
+                return "Hi " + sb.toString();
+            }
+        }
+        // unreachable
+        return null;
+    }
+
+    public boolean isRequestSuspended(String name) {
+        synchronized (suspended) {
+            while (!suspended.containsKey(name)) {
+                try {
+                    suspended.wait(1000);
+                } catch (InterruptedException ex) {
+                    return false;
+                }
+            }
+        }
+        System.out.println("Invocation for " + name + " has been suspended");
+        
+        return true;
+    }
+
+    public void resumeRequest(final String name) {
+        
+        Continuation suspendedCont = null;
+        synchronized (suspended) {
+            suspendedCont = suspended.get(name);
+        }
+        
+        if (suspendedCont != null) {
+            synchronized (suspendedCont) {
+                suspendedCont.resume();
+            }
+        }
+    }
+    
+    private void suspendInvocation(final String name, Continuation cont) {
+        
+        System.out.println("Suspending invocation for " + name);
+        
+        try {
+            long timeout = "Fred".equals(name) ? 8000 : 4000;
+            cont.suspend(timeout);    
+        } finally {
+            synchronized (suspended) {
+                suspended.put(name, cont);
+            }
+            if (!"Fred".equals(name)) {
+                executor.execute(new Runnable() {
+                    public void run() {
+                        try {
+                            Thread.sleep(2000);
+                        } catch (InterruptedException ex) {
+                            // ignore
+                        }       
+                        resumeRequest(name);
+                    }
+                });
+            }
+        }
+    }
+    
+    private Continuation getContinuation(String name) {
+        
+        System.out.println("Getting continuation for " + name);
+        
+        synchronized (suspended) {
+            Continuation suspendedCont = suspended.remove(name);
+            if (suspendedCont != null) {
+                return suspendedCont;
+            }
+        }
+        
+        ContinuationProvider provider = 
+            (ContinuationProvider)context.getMessageContext().get(ContinuationProvider.class.getName());
+        return provider.getContinuation();
+    }
+    
+}

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/HelloWorldWithContinuationsJMS2.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java Fri Apr 17 13:58:12 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms.continuations;
+
+import javax.xml.ws.Endpoint;
+
+import org.apache.cxf.Bus;
+import org.apache.cxf.BusFactory;
+import org.apache.cxf.bus.spring.SpringBusFactory;
+import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
+
+public class Server3 extends AbstractBusTestServerBase {
+
+    private static final String CONFIG_FILE =
+        "org/apache/cxf/systest/jms/continuations/jms_test_config.xml";
+   
+    protected void run()  {
+        SpringBusFactory bf = new SpringBusFactory();
+        Bus bus = bf.createBus(CONFIG_FILE);
+        BusFactory.setDefaultBus(bus);
+        Object implementor = new HelloWorldWithContinuationsJMS2();        
+        String address = "http://localhost:9000/SoapContext/SoapPort";
+        Endpoint.publish(address, implementor);
+    }
+
+
+    public static void main(String[] args) {
+        try {
+            Server3 s = new Server3();
+            s.start();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            System.exit(-1);
+        } finally {
+            System.out.println("done!");
+        }
+    }
+}

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/Server3.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml?rev=766013&r1=766012&r2=766013&view=diff
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml (original)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/jms_test_config.xml Fri Apr 17 13:58:12 2009
@@ -31,6 +31,11 @@
     
     <jms:conduit name="{http://cxf.apache.org/systest/jaxws}HelloContinuationPort.jms-conduit">
       <jms:clientConfig clientReceiveTimeout="500000" messageTimeToLive="500000"/>
+      <jms:address jndiConnectionFactoryName="ConnectionFactory" 
+                   jndiDestinationName="dynamicQueues/test.jmstransport.text">
+                   <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
+                   <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
+               </jms:address>
       <jms:jmsConfig-ref>jmsConf1</jms:jmsConfig-ref>
     </jms:conduit>
     
@@ -44,7 +49,25 @@
     
   <bean id="jmsConf1" class="org.apache.cxf.transport.jms.JMSConfiguration"
     p:connectionFactory-ref="singleConnectionFactory" 
-  	p:concurrentConsumers="10"
-  	p:maxConcurrentConsumers="10"/>
+  	p:concurrentConsumers="1"
+  	p:maxConcurrentConsumers="1"/>
+
+  <jms:destination name="{http://cxf.apache.org/systest/jaxws}HelloContinuationPort.jms-destination">     
+      <jms:address jndiConnectionFactoryName="ConnectionFactory" 
+                   jndiDestinationName="dynamicQueues/test.jmstransport.text">
+                   <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
+                   <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
+               </jms:address>
+      <jms:jmsConfig-ref>jmsConf2</jms:jmsConfig-ref>  
+  </jms:destination>
+
+  <bean id="jmsConf2" class="org.apache.cxf.transport.jms.JMSConfiguration"
+  	p:connectionFactory-ref="jmsConnectionFactory"
+  	p:timeToLive="500000"
+  	p:concurrentConsumers="1"
+  	p:maxConcurrentConsumers="1"
+  	p:maxSuspendedContinuations="1"
+  	p:cacheLevel="2"
+  />  
 
 </beans>

Added: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl?rev=766013&view=auto
==============================================================================
--- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl (added)
+++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl Fri Apr 17 13:58:12 2009
@@ -0,0 +1,76 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<definitions name="HelloContinuationService" 
+   targetNamespace="http://cxf.apache.org/systest/jaxws" 
+   xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" 
+   xmlns:tns="http://cxf.apache.org/systest/jaxws" 
+   xmlns="http://schemas.xmlsoap.org/wsdl/" 
+   xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+   xmlns:jms="http://cxf.apache.org/transports/jms">
+  
+  <message name="isRequestSuspended">
+    <part name="arg0" type="xsd:string"/>
+  </message>
+  <message name="sayHiResponse">
+    <part name="return" type="xsd:string"/>
+  </message>
+  <message name="sayHi">
+    <part name="arg0" type="xsd:string"/>
+    <part name="arg1" type="xsd:string"/>
+  </message>
+  <message name="resumeRequestResponse"/>
+
+  <message name="isRequestSuspendedResponse">
+    <part name="return" type="xsd:boolean"/>
+  </message>
+  
+  <message name="resumeRequest">
+    <part name="arg0" type="xsd:string"/>
+  </message>
+  <portType name="HelloContinuation">
+    <operation name="isRequestSuspended">
+      <input message="tns:isRequestSuspended" name="isRequestSuspended"/>
+      <output message="tns:isRequestSuspendedResponse" name="isRequestSuspendedResponse"/>
+    </operation>
+    <operation name="resumeRequest">
+      <input message="tns:resumeRequest" name="resumeRequest"/>
+      <output message="tns:resumeRequestResponse" name="resumeRequestResponse"/>
+    </operation>
+    <operation name="sayHi">
+      <input message="tns:sayHi" name="sayHi"/>
+      <output message="tns:sayHiResponse" name="sayHiResponse"/>
+    </operation>
+  </portType>
+  <binding name="HelloContinuationServiceSoapBinding" type="tns:HelloContinuation">
+    <soap:binding style="rpc" transport="http://cxf.apache.org/transports/jms" />
+    <operation name="isRequestSuspended">
+      <soap:operation soapAction="" style="rpc" />
+      <input name="isRequestSuspended">
+        <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+      </input>
+      <output name="isRequestSuspendedResponse">
+        <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+      </output>
+    </operation>
+    <operation name="resumeRequest">
+      <soap:operation soapAction="" style="rpc" />
+      <input name="resumeRequest">
+        <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+      </input>
+      <output name="resumeRequestResponse">
+        <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+      </output>
+    </operation>
+    <operation name="sayHi">
+      <soap:operation soapAction="" style="rpc" />
+      <input name="sayHi">
+        <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+      </input>
+      <output name="sayHiResponse">
+        <soap:body namespace="http://cxf.apache.org/systest/jaxws" use="literal" />
+      </output>
+    </operation>
+  </binding>
+  <service name="HelloContinuationService">
+    <port binding="tns:HelloContinuationServiceSoapBinding" name="HelloContinuationPort"/>
+  </service>
+</definitions>
\ No newline at end of file

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jms/continuations/test2.wsdl
------------------------------------------------------------------------------
    svn:mime-type = text/xml