You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2014/02/10 16:07:20 UTC

svn commit: r1566648 - in /cxf/trunk/rt/transports/jms/src: main/java/org/apache/cxf/transport/jms/ main/java/org/apache/cxf/transport/jms/continuations/ main/java/org/apache/cxf/transport/jms/util/ test/java/org/apache/cxf/transport/jms/ test/java/org...

Author: cschneider
Date: Mon Feb 10 15:07:19 2014
New Revision: 1566648

URL: http://svn.apache.org/r1566648
Log:
Make jms continuations independent of spring jms

Added:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java   (with props)
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java   (with props)
Modified:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Mon Feb 10 15:07:19 2014
@@ -21,12 +21,10 @@ package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
 import java.util.Calendar;
-import java.util.Collection;
 import java.util.GregorianCalendar;
 import java.util.Map;
 import java.util.SimpleTimeZone;
 import java.util.TimeZone;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -51,11 +49,11 @@ import org.apache.cxf.security.SecurityC
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractMultiplexDestination;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.jms.continuations.JMSContinuation;
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
 import org.apache.cxf.transport.jms.util.JMSSender;
 import org.apache.cxf.transport.jms.util.JMSUtil;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
+import org.apache.cxf.transport.jms.util.SpringJMSListenerAdapter;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -71,8 +69,7 @@ public class JMSDestination extends Abst
     private Bus bus;
     private EndpointInfo ei;
     private AbstractMessageListenerContainer jmsListener;
-    private Collection<JMSContinuation> continuations = 
-        new ConcurrentLinkedQueue<JMSContinuation>();
+    private ThrottlingCounter suspendedContinuations;
     private ClassLoader loader;
 
     public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
@@ -108,6 +105,10 @@ public class JMSDestination extends Abst
         Destination targetDestination = resolveTargetDestination();
         jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, 
                                                    targetDestination);
+        int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100;
+        this.suspendedContinuations = new ThrottlingCounter(new SpringJMSListenerAdapter(this.jmsListener), 
+                                                            restartLimit,
+                                                            jmsConfig.getMaxSuspendedContinuations());
     }
 
     private Destination resolveTargetDestination() {
@@ -185,13 +186,11 @@ public class JMSDestination extends Abst
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
             ((MessageImpl)inMessage).setDestination(this);
             if (jmsConfig.getMaxSuspendedContinuations() != 0) {
-                inMessage.put(ContinuationProvider.class.getName(), 
-                              new JMSContinuationProvider(bus,
-                                                          inMessage,
-                                                          incomingObserver,
-                                                          continuations,
-                                                          jmsListener,
-                                                          jmsConfig));
+                JMSContinuationProvider cp = new JMSContinuationProvider(bus, 
+                                                                         inMessage, 
+                                                                         incomingObserver, 
+                                                                         suspendedContinuations);
+                inMessage.put(ContinuationProvider.class.getName(), cp);
             }
             
             origBus = BusFactory.getAndSetThreadDefaultBus(bus);

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cxf.transport.jms.continuations.Counter;
+import org.apache.cxf.transport.jms.util.JMSListenerContainer;
+
+/**
+ * Counter that throttles a jms listener on a high and low water mark.
+ * 
+ * When the counter reaches the high watermark the listener will be stopped.
+ * When the counter reaches the low watermark the listener will be started.
+ */
+public class ThrottlingCounter implements Counter {
+
+    private AtomicInteger counter;
+    private int lowWatermark;
+    private int highWatermark;
+    private JMSListenerContainer listenerContainer;
+    
+    public ThrottlingCounter(JMSListenerContainer listenerContainer, int lowWatermark, int highWatermark) {
+        this.counter = new AtomicInteger();
+        this.lowWatermark =  lowWatermark;
+        this.highWatermark = highWatermark;
+        this.listenerContainer = listenerContainer;
+    }
+    
+    public final int incrementAndGet() {
+        int curCounter = counter.incrementAndGet();
+        if (curCounter >= highWatermark && listenerContainer.isRunning()) {
+            listenerContainer.stop();
+        }
+        return curCounter;
+    }
+    
+    public final int decrementAndGet() {
+        int curCounter = counter.decrementAndGet();
+        if (curCounter <= lowWatermark && !listenerContainer.isRunning()) {
+            listenerContainer.start();
+        }
+        return curCounter;
+    }
+
+}
\ No newline at end of file

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.continuations;
+
+public interface Counter {
+    int incrementAndGet();
+    int decrementAndGet();
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Mon Feb 10 15:07:19 2014
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.transport.jms.continuations;
 
-import java.util.Collection;
 import java.util.logging.Logger;
 
 import org.apache.cxf.Bus;
@@ -30,43 +29,33 @@ import org.apache.cxf.common.logging.Log
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.jms.JMSConfiguration;
 import org.apache.cxf.workqueue.WorkQueue;
 import org.apache.cxf.workqueue.WorkQueueManager;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
 
 public class JMSContinuation implements Continuation {
     private static final Logger LOG = LogUtils.getL7dLogger(JMSContinuation.class);
     private Bus bus;
     private Message inMessage;
     private MessageObserver incomingObserver;
-    private Collection<JMSContinuation> continuations;
-    private AbstractMessageListenerContainer jmsListener;
-    private JMSConfiguration jmsConfig;
-    
+    private Counter suspendendContinuations;
+
     private volatile Object userObject;
-    
+
     private volatile boolean isNew = true;
     private volatile boolean isPending;
     private volatile boolean isResumed;
     private volatile boolean isCanceled;
     private WorkQueue workQueue;
     private ClassLoader loader;
-    
-    public JMSContinuation(Bus b, Message m, MessageObserver observer,
-                           Collection<JMSContinuation> cList, 
-                           AbstractMessageListenerContainer jmsListener,
-                           JMSConfiguration jmsConfig) {
+
+    public JMSContinuation(Bus b, Message m, MessageObserver observer, Counter suspendendContinuations) {
         bus = b;
-        inMessage = m;    
+        inMessage = m;
         incomingObserver = observer;
-        continuations = cList;
-        this.jmsListener = jmsListener;
-        this.jmsConfig = jmsConfig;
+        this.suspendendContinuations = suspendendContinuations;
         WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
         if (manager != null) {
-            workQueue =  manager.getNamedWorkQueue("jms-continuation");
+            workQueue = manager.getNamedWorkQueue("jms-continuation");
             if (workQueue == null) {
                 workQueue = manager.getAutomaticWorkQueue();
             }
@@ -110,7 +99,7 @@ public class JMSContinuation implements 
     }
     
     protected void doResume() {
-        updateContinuations(true);
+        suspendendContinuations.decrementAndGet();
         ClassLoaderHolder origLoader = null;
         Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
         try {
@@ -139,9 +128,9 @@ public class JMSContinuation implements 
             return false;
         }
         inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
-        
-        updateContinuations(false);
-                
+
+        suspendendContinuations.incrementAndGet();
+
         isNew = false;
         isResumed = false;
         isPending = true;
@@ -167,45 +156,6 @@ public class JMSContinuation implements 
     protected synchronized void cancelTimerTask() {
         isCanceled = true;
     }
-    
-    protected void updateContinuations(boolean remove) {
 
-        if (jmsConfig.getMaxSuspendedContinuations() < 0
-            || (jmsListener instanceof DefaultMessageListenerContainer
-                && ((DefaultMessageListenerContainer)jmsListener).getCacheLevel() 
-                    >= DefaultMessageListenerContainer.CACHE_CONSUMER)) {
-            modifyList(remove);
-            return;
-        }
-        
-        // throttle the flow if there're too many continuation instances in memory
-        synchronized (continuations) {
-            modifyList(remove);
-            if (continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
-                jmsListener.stop();
-            } else if (!jmsListener.isRunning()) {
-                int limit = jmsConfig.getReconnectPercentOfMax();
-                if (limit < 0 || limit > 100) {
-                    limit = 70;
-                }
-                limit = (limit * jmsConfig.getMaxSuspendedContinuations()) / 100; 
-            
-                if (continuations.size() <= limit) {
-                    jmsListener.start();
-                }
-            }
-        }
 
-    }
-    
-    protected void modifyList(boolean remove) {
-        if (remove) {
-            continuations.remove(this);
-        } else {
-            continuations.add(this);
-        }
-    }
-    
-    
-    
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Mon Feb 10 15:07:19 2014
@@ -19,38 +19,29 @@
 
 package org.apache.cxf.transport.jms.continuations;
 
-import java.util.Collection;
-
 import org.apache.cxf.Bus;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.jms.JMSConfiguration;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
 
 public class JMSContinuationProvider implements ContinuationProvider {
 
     private Bus bus;
     private Message inMessage;
     private MessageObserver incomingObserver;
-    private Collection<JMSContinuation> continuations;
-    private AbstractMessageListenerContainer jmsListener;
-    private JMSConfiguration jmsConfig;
+    private Counter suspendendContinuations;
     
     public JMSContinuationProvider(Bus b,
                                    Message m, 
                                    MessageObserver observer,
-                                   Collection<JMSContinuation> cList,
-                                   AbstractMessageListenerContainer jmsListener,
-                                   JMSConfiguration jmsConfig) {
+                                   Counter suspendendContinuations) {
         bus = b;
         inMessage = m;    
         incomingObserver = observer;
-        continuations = cList;
-        this.jmsListener = jmsListener;
-        this.jmsConfig = jmsConfig;
+        this.suspendendContinuations = suspendendContinuations;
     }
+
     public void complete() {
         JMSContinuation cw = inMessage.get(JMSContinuation.class);
         if (cw != null) {
@@ -69,8 +60,7 @@ public class JMSContinuationProvider imp
         }
         JMSContinuation cw = m.get(JMSContinuation.class);
         if (cw == null) {
-            cw = new JMSContinuation(bus, m,  incomingObserver, continuations, 
-                                     jmsListener, jmsConfig);
+            cw = new JMSContinuation(bus, m,  incomingObserver, suspendendContinuations);
             m.put(JMSContinuation.class, cw);
         }
         return cw;

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms.util;
+
+public interface JMSListenerContainer {
+    boolean isRunning();
+    void stop();
+    void start();
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import org.springframework.jms.listener.AbstractJmsListeningContainer;
+
+public class SpringJMSListenerAdapter implements JMSListenerContainer {
+    
+    private AbstractJmsListeningContainer container;
+    
+    public SpringJMSListenerAdapter(AbstractJmsListeningContainer container) {
+        this.container = container;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return container.isRunning();
+    }
+
+    @Override
+    public void stop() {
+        container.stop();
+    }
+
+    @Override
+    public void start() {
+        container.start();
+    }
+
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java (added)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import org.apache.cxf.transport.jms.util.JMSListenerContainer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class ThrottlingCounterTest {
+
+    @Test
+    public void testThrottleWithJmsStartAndStop() {
+        JMSListenerContainer listenerContainer = new DummyJMSListener();
+
+        ThrottlingCounter counter = new ThrottlingCounter(listenerContainer, 0, 1);
+        assertTrue(listenerContainer.isRunning());
+        
+        counter.incrementAndGet();
+        assertFalse(listenerContainer.isRunning());
+        
+        counter.decrementAndGet();
+        assertTrue(listenerContainer.isRunning());
+        
+    }
+    
+    public class DummyJMSListener implements JMSListenerContainer {
+        boolean running = true;
+
+        @Override
+        public boolean isRunning() {
+            return running;
+        }
+
+        @Override
+        public void stop() {
+            running = false;
+        }
+
+        @Override
+        public void start() {
+            running = true;
+        }
+
+    }
+
+}

Propchange: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java Mon Feb 10 15:07:19 2014
@@ -26,7 +26,7 @@ import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
-
+import org.easymock.EasyMock;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,8 +46,9 @@ public class JMSContinuationProviderTest
         exchange.setOneWay(true);
         Message m = new MessageImpl();
         m.setExchange(exchange);
+        Counter counter = EasyMock.createMock(Counter.class);
         JMSContinuationProvider provider = 
-            new JMSContinuationProvider(null, m, null, null, null, null);
+            new JMSContinuationProvider(null, m, null, counter);
         assertNull(provider.getContinuation());
     }
     
@@ -55,8 +56,9 @@ public class JMSContinuationProviderTest
     public void testGetNewContinuation() {
         Message m = new MessageImpl();
         m.setExchange(new ExchangeImpl());
+        Counter counter = EasyMock.createMock(Counter.class);
         JMSContinuationProvider provider = 
-            new JMSContinuationProvider(bus, m, null, null, null, null);
+            new JMSContinuationProvider(bus, m, null, counter);
         Continuation cw = provider.getContinuation(); 
         assertTrue(cw.isNew());
         assertSame(cw, m.get(JMSContinuation.class));
@@ -66,9 +68,10 @@ public class JMSContinuationProviderTest
     public void testGetExistingContinuation() {
         Message m = new MessageImpl();
         m.setExchange(new ExchangeImpl());
-        JMSContinuation cw = new JMSContinuation(bus, m, null, null, null, null);
+        Counter counter = EasyMock.createMock(Counter.class);
+        JMSContinuation cw = new JMSContinuation(bus, m, null, counter);
         m.put(JMSContinuation.class, cw);
-        JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null, null, null);
+        JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, counter);
         assertSame(cw, provider.getContinuation());
         assertSame(cw, m.get(JMSContinuation.class));
     }

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Mon Feb 10 15:07:19 2014
@@ -19,32 +19,24 @@
 
 package org.apache.cxf.transport.jms.continuations;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
-
 import org.apache.cxf.interceptor.InterceptorChain;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.jms.JMSConfiguration;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.springframework.jms.JmsException;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-
-
 
 public class JMSContinuationTest extends Assert {
 
     private Message m;
-    private List<JMSContinuation> continuations;
     private Bus b;
     private MessageObserver observer;
     
@@ -55,15 +47,15 @@ public class JMSContinuationTest extends
         m.setExchange(exchange);
         m.setInterceptorChain(EasyMock.createMock(InterceptorChain.class));
         exchange.setInMessage(m);
-        continuations = new LinkedList<JMSContinuation>();
+        
         b = BusFactory.getDefaultBus();
         observer = EasyMock.createMock(MessageObserver.class);
     }
     
     @Test
     public void testInitialStatus() {
-        JMSContinuation cw = 
-            new JMSContinuation(b, m, observer, continuations, null, null);
+        Counter continuations = EasyMock.createMock(Counter.class);
+        JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
         assertTrue(cw.isNew());
         assertFalse(cw.isPending());
         assertFalse(cw.isResumed());
@@ -71,67 +63,39 @@ public class JMSContinuationTest extends
     
     @Test
     public void testSuspendResume() {
-        TestJMSContinuationWrapper cw = 
-            new TestJMSContinuationWrapper(b, m, observer, continuations, null, new JMSConfiguration());
-        
+        DummyCounter continuations = new DummyCounter();
+        JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
+
         cw.suspend(5000);
+        Assert.assertEquals(1, continuations.counter.get());
           
         assertFalse(cw.isNew());
         assertTrue(cw.isPending());
         assertFalse(cw.isResumed());
         
-        assertTrue(cw.isTaskCreated());
-        assertFalse(cw.isTaskCancelled());
-        assertEquals(continuations.size(), 1);
-        assertSame(continuations.get(0), cw);
         
         assertFalse(cw.suspend(1000));
+        Assert.assertEquals(1, continuations.counter.get());
         
         observer.onMessage(m);
         EasyMock.expectLastCall();
         EasyMock.replay(observer);
         
         cw.resume();
-        
+        Assert.assertEquals(0, continuations.counter.get());        
         assertFalse(cw.isNew());
         assertFalse(cw.isPending());
         assertTrue(cw.isResumed());
         
-        assertFalse(cw.isTaskCreated());
-        assertTrue(cw.isTaskCancelled());
-        assertEquals(continuations.size(), 0);
         EasyMock.verify(observer);
     }
     
     @Test
-    public void testThrottleWithJmsStartAndStop() {
-        
-        DefaultMessageListenerContainerStub springContainer = new DefaultMessageListenerContainerStub();
-        springContainer.setCacheLevel(2);
-        JMSConfiguration config = new JMSConfiguration();
-        config.setMaxSuspendedContinuations(1);
+    public void testSendMessageOnResume() {
+        Counter continuations = new DummyCounter();
+        JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
         
-        TestJMSContinuationWrapper cw = 
-            new TestJMSContinuationWrapper(b, m, observer, continuations,
-                                           springContainer, config);
-        
-        assertFalse(springContainer.isStart());
-        assertFalse(springContainer.isStop());
-        
-        suspendResumeCheckStartAndStop(cw, config, springContainer);
-        EasyMock.reset(observer);
-        suspendResumeCheckStartAndStop(cw, config, springContainer);
-        
-    }
-    
-    private void suspendResumeCheckStartAndStop(JMSContinuation cw, JMSConfiguration config,
-                                            DefaultMessageListenerContainerStub springContainer) {
         cw.suspend(5000);
-            
-        assertEquals(continuations.size(), 1);
-        assertSame(continuations.get(0), cw);
-        assertTrue(springContainer.isStop());
-        
         assertFalse(cw.suspend(1000));
         
         observer.onMessage(m);
@@ -140,75 +104,32 @@ public class JMSContinuationTest extends
         
         cw.resume();
         
-        assertEquals(continuations.size(), 0);
-        assertTrue(springContainer.isStart());
         EasyMock.verify(observer);
     }
     
     @Test
     public void testUserObject() {
-        JMSContinuation cw = new JMSContinuation(b, m, observer, continuations, null, null);
+        Counter continuations = new DummyCounter();
+        JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
         assertNull(cw.getObject());
         Object userObject = new Object();
         cw.setObject(userObject);
         assertSame(userObject, cw.getObject());
     }
     
-    private static class TestJMSContinuationWrapper extends JMSContinuation {
-        
-        private boolean taskCreated;
-        private boolean taskCancelled;
-        
-        public TestJMSContinuationWrapper(Bus b,
-                                          Message m, 
-                                          MessageObserver observer,
-                                          List<JMSContinuation> cList,
-                                          DefaultMessageListenerContainer jmsListener,
-                                          JMSConfiguration jmsConfig) {
-            super(b, m, observer, cList, jmsListener, jmsConfig);
-        }
-        
-        public void createTimerTask(long timeout) {
-            taskCreated = true;
-        }
-        
-        public void cancelTimerTask() {
-            taskCancelled = true;
-        }
-        
-        public boolean isTaskCreated() {
-            boolean result = taskCreated;
-            taskCreated = false;
-            return result;
-        }
-        
-        public boolean isTaskCancelled() {
-            boolean result = taskCancelled;
-            taskCancelled = false;
-            return result;
-        }
-    }
-    
-    private class DefaultMessageListenerContainerStub extends DefaultMessageListenerContainer {
-        private boolean start;
-        private boolean stop;
-
-        public void start() throws JmsException {
-            this.start = true;
-            this.stop = false;
-        }
+    public class DummyCounter implements Counter {
+        AtomicInteger counter = new AtomicInteger();
 
-        public void stop() throws JmsException {
-            this.stop = true;
-            this.start = false;
+        @Override
+        public int incrementAndGet() {
+            return counter.incrementAndGet();
         }
 
-        public boolean isStart() {
-            return this.start;
-        }
-
-        public boolean isStop() {
-            return this.stop;
+        @Override
+        public int decrementAndGet() {
+            return counter.decrementAndGet();
         }
+        
     }
+
 }