You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2014/02/10 16:07:20 UTC
svn commit: r1566648 - in /cxf/trunk/rt/transports/jms/src:
main/java/org/apache/cxf/transport/jms/
main/java/org/apache/cxf/transport/jms/continuations/
main/java/org/apache/cxf/transport/jms/util/
test/java/org/apache/cxf/transport/jms/ test/java/org...
Author: cschneider
Date: Mon Feb 10 15:07:19 2014
New Revision: 1566648
URL: http://svn.apache.org/r1566648
Log:
Make jms continuations independent of spring jms
Added:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java (with props)
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java (with props)
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java (with props)
Modified:
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Mon Feb 10 15:07:19 2014
@@ -21,12 +21,10 @@ package org.apache.cxf.transport.jms;
import java.io.UnsupportedEncodingException;
import java.util.Calendar;
-import java.util.Collection;
import java.util.GregorianCalendar;
import java.util.Map;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -51,11 +49,11 @@ import org.apache.cxf.security.SecurityC
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractMultiplexDestination;
import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.jms.continuations.JMSContinuation;
import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
import org.apache.cxf.transport.jms.util.JMSSender;
import org.apache.cxf.transport.jms.util.JMSUtil;
import org.apache.cxf.transport.jms.util.ResourceCloser;
+import org.apache.cxf.transport.jms.util.SpringJMSListenerAdapter;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -71,8 +69,7 @@ public class JMSDestination extends Abst
private Bus bus;
private EndpointInfo ei;
private AbstractMessageListenerContainer jmsListener;
- private Collection<JMSContinuation> continuations =
- new ConcurrentLinkedQueue<JMSContinuation>();
+ private ThrottlingCounter suspendedContinuations;
private ClassLoader loader;
public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) {
@@ -108,6 +105,10 @@ public class JMSDestination extends Abst
Destination targetDestination = resolveTargetDestination();
jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this,
targetDestination);
+ int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100;
+ this.suspendedContinuations = new ThrottlingCounter(new SpringJMSListenerAdapter(this.jmsListener),
+ restartLimit,
+ jmsConfig.getMaxSuspendedContinuations());
}
private Destination resolveTargetDestination() {
@@ -185,13 +186,11 @@ public class JMSDestination extends Abst
inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
((MessageImpl)inMessage).setDestination(this);
if (jmsConfig.getMaxSuspendedContinuations() != 0) {
- inMessage.put(ContinuationProvider.class.getName(),
- new JMSContinuationProvider(bus,
- inMessage,
- incomingObserver,
- continuations,
- jmsListener,
- jmsConfig));
+ JMSContinuationProvider cp = new JMSContinuationProvider(bus,
+ inMessage,
+ incomingObserver,
+ suspendedContinuations);
+ inMessage.put(ContinuationProvider.class.getName(), cp);
}
origBus = BusFactory.getAndSetThreadDefaultBus(bus);
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.cxf.transport.jms.continuations.Counter;
+import org.apache.cxf.transport.jms.util.JMSListenerContainer;
+
+/**
+ * Counter that throttles a jms listener on a high and low water mark.
+ *
+ * When the counter reaches the high watermark the listener will be stopped.
+ * When the counter reaches the low watermark the listener will be started.
+ */
+public class ThrottlingCounter implements Counter {
+
+ private AtomicInteger counter;
+ private int lowWatermark;
+ private int highWatermark;
+ private JMSListenerContainer listenerContainer;
+
+ public ThrottlingCounter(JMSListenerContainer listenerContainer, int lowWatermark, int highWatermark) {
+ this.counter = new AtomicInteger();
+ this.lowWatermark = lowWatermark;
+ this.highWatermark = highWatermark;
+ this.listenerContainer = listenerContainer;
+ }
+
+ public final int incrementAndGet() {
+ int curCounter = counter.incrementAndGet();
+ if (curCounter >= highWatermark && listenerContainer.isRunning()) {
+ listenerContainer.stop();
+ }
+ return curCounter;
+ }
+
+ public final int decrementAndGet() {
+ int curCounter = counter.decrementAndGet();
+ if (curCounter <= lowWatermark && !listenerContainer.isRunning()) {
+ listenerContainer.start();
+ }
+ return curCounter;
+ }
+
+}
\ No newline at end of file
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ThrottlingCounter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.continuations;
+
+public interface Counter {
+ int incrementAndGet();
+ int decrementAndGet();
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/Counter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Mon Feb 10 15:07:19 2014
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.jms.continuations;
-import java.util.Collection;
import java.util.logging.Logger;
import org.apache.cxf.Bus;
@@ -30,43 +29,33 @@ import org.apache.cxf.common.logging.Log
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.jms.JMSConfiguration;
import org.apache.cxf.workqueue.WorkQueue;
import org.apache.cxf.workqueue.WorkQueueManager;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class JMSContinuation implements Continuation {
private static final Logger LOG = LogUtils.getL7dLogger(JMSContinuation.class);
private Bus bus;
private Message inMessage;
private MessageObserver incomingObserver;
- private Collection<JMSContinuation> continuations;
- private AbstractMessageListenerContainer jmsListener;
- private JMSConfiguration jmsConfig;
-
+ private Counter suspendendContinuations;
+
private volatile Object userObject;
-
+
private volatile boolean isNew = true;
private volatile boolean isPending;
private volatile boolean isResumed;
private volatile boolean isCanceled;
private WorkQueue workQueue;
private ClassLoader loader;
-
- public JMSContinuation(Bus b, Message m, MessageObserver observer,
- Collection<JMSContinuation> cList,
- AbstractMessageListenerContainer jmsListener,
- JMSConfiguration jmsConfig) {
+
+ public JMSContinuation(Bus b, Message m, MessageObserver observer, Counter suspendendContinuations) {
bus = b;
- inMessage = m;
+ inMessage = m;
incomingObserver = observer;
- continuations = cList;
- this.jmsListener = jmsListener;
- this.jmsConfig = jmsConfig;
+ this.suspendendContinuations = suspendendContinuations;
WorkQueueManager manager = bus.getExtension(WorkQueueManager.class);
if (manager != null) {
- workQueue = manager.getNamedWorkQueue("jms-continuation");
+ workQueue = manager.getNamedWorkQueue("jms-continuation");
if (workQueue == null) {
workQueue = manager.getAutomaticWorkQueue();
}
@@ -110,7 +99,7 @@ public class JMSContinuation implements
}
protected void doResume() {
- updateContinuations(true);
+ suspendendContinuations.decrementAndGet();
ClassLoaderHolder origLoader = null;
Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus);
try {
@@ -139,9 +128,9 @@ public class JMSContinuation implements
return false;
}
inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
-
- updateContinuations(false);
-
+
+ suspendendContinuations.incrementAndGet();
+
isNew = false;
isResumed = false;
isPending = true;
@@ -167,45 +156,6 @@ public class JMSContinuation implements
protected synchronized void cancelTimerTask() {
isCanceled = true;
}
-
- protected void updateContinuations(boolean remove) {
- if (jmsConfig.getMaxSuspendedContinuations() < 0
- || (jmsListener instanceof DefaultMessageListenerContainer
- && ((DefaultMessageListenerContainer)jmsListener).getCacheLevel()
- >= DefaultMessageListenerContainer.CACHE_CONSUMER)) {
- modifyList(remove);
- return;
- }
-
- // throttle the flow if there're too many continuation instances in memory
- synchronized (continuations) {
- modifyList(remove);
- if (continuations.size() >= jmsConfig.getMaxSuspendedContinuations()) {
- jmsListener.stop();
- } else if (!jmsListener.isRunning()) {
- int limit = jmsConfig.getReconnectPercentOfMax();
- if (limit < 0 || limit > 100) {
- limit = 70;
- }
- limit = (limit * jmsConfig.getMaxSuspendedContinuations()) / 100;
-
- if (continuations.size() <= limit) {
- jmsListener.start();
- }
- }
- }
- }
-
- protected void modifyList(boolean remove) {
- if (remove) {
- continuations.remove(this);
- } else {
- continuations.add(this);
- }
- }
-
-
-
}
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProvider.java Mon Feb 10 15:07:19 2014
@@ -19,38 +19,29 @@
package org.apache.cxf.transport.jms.continuations;
-import java.util.Collection;
-
import org.apache.cxf.Bus;
import org.apache.cxf.continuations.Continuation;
import org.apache.cxf.continuations.ContinuationProvider;
import org.apache.cxf.message.Message;
import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.jms.JMSConfiguration;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
public class JMSContinuationProvider implements ContinuationProvider {
private Bus bus;
private Message inMessage;
private MessageObserver incomingObserver;
- private Collection<JMSContinuation> continuations;
- private AbstractMessageListenerContainer jmsListener;
- private JMSConfiguration jmsConfig;
+ private Counter suspendendContinuations;
public JMSContinuationProvider(Bus b,
Message m,
MessageObserver observer,
- Collection<JMSContinuation> cList,
- AbstractMessageListenerContainer jmsListener,
- JMSConfiguration jmsConfig) {
+ Counter suspendendContinuations) {
bus = b;
inMessage = m;
incomingObserver = observer;
- continuations = cList;
- this.jmsListener = jmsListener;
- this.jmsConfig = jmsConfig;
+ this.suspendendContinuations = suspendendContinuations;
}
+
public void complete() {
JMSContinuation cw = inMessage.get(JMSContinuation.class);
if (cw != null) {
@@ -69,8 +60,7 @@ public class JMSContinuationProvider imp
}
JMSContinuation cw = m.get(JMSContinuation.class);
if (cw == null) {
- cw = new JMSContinuation(bus, m, incomingObserver, continuations,
- jmsListener, jmsConfig);
+ cw = new JMSContinuation(bus, m, incomingObserver, suspendendContinuations);
m.put(JMSContinuation.class, cw);
}
return cw;
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms.util;
+
+public interface JMSListenerContainer {
+ boolean isRunning();
+ void stop();
+ void start();
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSListenerContainer.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import org.springframework.jms.listener.AbstractJmsListeningContainer;
+
+public class SpringJMSListenerAdapter implements JMSListenerContainer {
+
+ private AbstractJmsListeningContainer container;
+
+ public SpringJMSListenerAdapter(AbstractJmsListeningContainer container) {
+ this.container = container;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return container.isRunning();
+ }
+
+ @Override
+ public void stop() {
+ container.stop();
+ }
+
+ @Override
+ public void start() {
+ container.start();
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java?rev=1566648&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java (added)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java Mon Feb 10 15:07:19 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import org.apache.cxf.transport.jms.util.JMSListenerContainer;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class ThrottlingCounterTest {
+
+ @Test
+ public void testThrottleWithJmsStartAndStop() {
+ JMSListenerContainer listenerContainer = new DummyJMSListener();
+
+ ThrottlingCounter counter = new ThrottlingCounter(listenerContainer, 0, 1);
+ assertTrue(listenerContainer.isRunning());
+
+ counter.incrementAndGet();
+ assertFalse(listenerContainer.isRunning());
+
+ counter.decrementAndGet();
+ assertTrue(listenerContainer.isRunning());
+
+ }
+
+ public class DummyJMSListener implements JMSListenerContainer {
+ boolean running = true;
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void stop() {
+ running = false;
+ }
+
+ @Override
+ public void start() {
+ running = true;
+ }
+
+ }
+
+}
Propchange: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ThrottlingCounterTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationProviderTest.java Mon Feb 10 15:07:19 2014
@@ -26,7 +26,7 @@ import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
-
+import org.easymock.EasyMock;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
@@ -46,8 +46,9 @@ public class JMSContinuationProviderTest
exchange.setOneWay(true);
Message m = new MessageImpl();
m.setExchange(exchange);
+ Counter counter = EasyMock.createMock(Counter.class);
JMSContinuationProvider provider =
- new JMSContinuationProvider(null, m, null, null, null, null);
+ new JMSContinuationProvider(null, m, null, counter);
assertNull(provider.getContinuation());
}
@@ -55,8 +56,9 @@ public class JMSContinuationProviderTest
public void testGetNewContinuation() {
Message m = new MessageImpl();
m.setExchange(new ExchangeImpl());
+ Counter counter = EasyMock.createMock(Counter.class);
JMSContinuationProvider provider =
- new JMSContinuationProvider(bus, m, null, null, null, null);
+ new JMSContinuationProvider(bus, m, null, counter);
Continuation cw = provider.getContinuation();
assertTrue(cw.isNew());
assertSame(cw, m.get(JMSContinuation.class));
@@ -66,9 +68,10 @@ public class JMSContinuationProviderTest
public void testGetExistingContinuation() {
Message m = new MessageImpl();
m.setExchange(new ExchangeImpl());
- JMSContinuation cw = new JMSContinuation(bus, m, null, null, null, null);
+ Counter counter = EasyMock.createMock(Counter.class);
+ JMSContinuation cw = new JMSContinuation(bus, m, null, counter);
m.put(JMSContinuation.class, cw);
- JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, null, null, null);
+ JMSContinuationProvider provider = new JMSContinuationProvider(null, m, null, counter);
assertSame(cw, provider.getContinuation());
assertSame(cw, m.get(JMSContinuation.class));
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java?rev=1566648&r1=1566647&r2=1566648&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/continuations/JMSContinuationTest.java Mon Feb 10 15:07:19 2014
@@ -19,32 +19,24 @@
package org.apache.cxf.transport.jms.continuations;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
-
import org.apache.cxf.interceptor.InterceptorChain;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.transport.jms.JMSConfiguration;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.springframework.jms.JmsException;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-
-
public class JMSContinuationTest extends Assert {
private Message m;
- private List<JMSContinuation> continuations;
private Bus b;
private MessageObserver observer;
@@ -55,15 +47,15 @@ public class JMSContinuationTest extends
m.setExchange(exchange);
m.setInterceptorChain(EasyMock.createMock(InterceptorChain.class));
exchange.setInMessage(m);
- continuations = new LinkedList<JMSContinuation>();
+
b = BusFactory.getDefaultBus();
observer = EasyMock.createMock(MessageObserver.class);
}
@Test
public void testInitialStatus() {
- JMSContinuation cw =
- new JMSContinuation(b, m, observer, continuations, null, null);
+ Counter continuations = EasyMock.createMock(Counter.class);
+ JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
assertTrue(cw.isNew());
assertFalse(cw.isPending());
assertFalse(cw.isResumed());
@@ -71,67 +63,39 @@ public class JMSContinuationTest extends
@Test
public void testSuspendResume() {
- TestJMSContinuationWrapper cw =
- new TestJMSContinuationWrapper(b, m, observer, continuations, null, new JMSConfiguration());
-
+ DummyCounter continuations = new DummyCounter();
+ JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
+
cw.suspend(5000);
+ Assert.assertEquals(1, continuations.counter.get());
assertFalse(cw.isNew());
assertTrue(cw.isPending());
assertFalse(cw.isResumed());
- assertTrue(cw.isTaskCreated());
- assertFalse(cw.isTaskCancelled());
- assertEquals(continuations.size(), 1);
- assertSame(continuations.get(0), cw);
assertFalse(cw.suspend(1000));
+ Assert.assertEquals(1, continuations.counter.get());
observer.onMessage(m);
EasyMock.expectLastCall();
EasyMock.replay(observer);
cw.resume();
-
+ Assert.assertEquals(0, continuations.counter.get());
assertFalse(cw.isNew());
assertFalse(cw.isPending());
assertTrue(cw.isResumed());
- assertFalse(cw.isTaskCreated());
- assertTrue(cw.isTaskCancelled());
- assertEquals(continuations.size(), 0);
EasyMock.verify(observer);
}
@Test
- public void testThrottleWithJmsStartAndStop() {
-
- DefaultMessageListenerContainerStub springContainer = new DefaultMessageListenerContainerStub();
- springContainer.setCacheLevel(2);
- JMSConfiguration config = new JMSConfiguration();
- config.setMaxSuspendedContinuations(1);
+ public void testSendMessageOnResume() {
+ Counter continuations = new DummyCounter();
+ JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
- TestJMSContinuationWrapper cw =
- new TestJMSContinuationWrapper(b, m, observer, continuations,
- springContainer, config);
-
- assertFalse(springContainer.isStart());
- assertFalse(springContainer.isStop());
-
- suspendResumeCheckStartAndStop(cw, config, springContainer);
- EasyMock.reset(observer);
- suspendResumeCheckStartAndStop(cw, config, springContainer);
-
- }
-
- private void suspendResumeCheckStartAndStop(JMSContinuation cw, JMSConfiguration config,
- DefaultMessageListenerContainerStub springContainer) {
cw.suspend(5000);
-
- assertEquals(continuations.size(), 1);
- assertSame(continuations.get(0), cw);
- assertTrue(springContainer.isStop());
-
assertFalse(cw.suspend(1000));
observer.onMessage(m);
@@ -140,75 +104,32 @@ public class JMSContinuationTest extends
cw.resume();
- assertEquals(continuations.size(), 0);
- assertTrue(springContainer.isStart());
EasyMock.verify(observer);
}
@Test
public void testUserObject() {
- JMSContinuation cw = new JMSContinuation(b, m, observer, continuations, null, null);
+ Counter continuations = new DummyCounter();
+ JMSContinuation cw = new JMSContinuation(b, m, observer, continuations);
assertNull(cw.getObject());
Object userObject = new Object();
cw.setObject(userObject);
assertSame(userObject, cw.getObject());
}
- private static class TestJMSContinuationWrapper extends JMSContinuation {
-
- private boolean taskCreated;
- private boolean taskCancelled;
-
- public TestJMSContinuationWrapper(Bus b,
- Message m,
- MessageObserver observer,
- List<JMSContinuation> cList,
- DefaultMessageListenerContainer jmsListener,
- JMSConfiguration jmsConfig) {
- super(b, m, observer, cList, jmsListener, jmsConfig);
- }
-
- public void createTimerTask(long timeout) {
- taskCreated = true;
- }
-
- public void cancelTimerTask() {
- taskCancelled = true;
- }
-
- public boolean isTaskCreated() {
- boolean result = taskCreated;
- taskCreated = false;
- return result;
- }
-
- public boolean isTaskCancelled() {
- boolean result = taskCancelled;
- taskCancelled = false;
- return result;
- }
- }
-
- private class DefaultMessageListenerContainerStub extends DefaultMessageListenerContainer {
- private boolean start;
- private boolean stop;
-
- public void start() throws JmsException {
- this.start = true;
- this.stop = false;
- }
+ public class DummyCounter implements Counter {
+ AtomicInteger counter = new AtomicInteger();
- public void stop() throws JmsException {
- this.stop = true;
- this.start = false;
+ @Override
+ public int incrementAndGet() {
+ return counter.incrementAndGet();
}
- public boolean isStart() {
- return this.start;
- }
-
- public boolean isStop() {
- return this.stop;
+ @Override
+ public int decrementAndGet() {
+ return counter.decrementAndGet();
}
+
}
+
}