You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2010/03/03 08:50:11 UTC
svn commit: r918354 - in /servicemix/smx3/trunk/core/servicemix-core/src:
main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java
Author: ffang
Date: Wed Mar 3 07:50:11 2010
New Revision: 918354
URL: http://svn.apache.org/viewvc?rev=918354&view=rev
Log:
[SM-1937] Incorrect logic in throttle method of DeliveryChannelImpl
Modified:
servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java
Modified: servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=918354&r1=918353&r2=918354&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Wed Mar 3 07:50:11 2010
@@ -338,16 +338,19 @@
protected void throttle() {
if (component.isExchangeThrottling()) {
- if (component.getThrottlingInterval() > intervalCount) {
- intervalCount = 0;
+ if (component.getThrottlingInterval() <= intervalCount) {
+ intervalCount = -1;
try {
- Thread.sleep(component.getThrottlingTimeout());
+ long timeout = component.getThrottlingTimeout();
+ LOG.debug("throttling, sleep for: " + timeout);
+ Thread.sleep(timeout);
} catch (InterruptedException e) {
LOG.warn("throttling failed", e);
}
}
intervalCount++;
}
+
}
protected void doSend(MessageExchangeImpl me, boolean sync) throws MessagingException {
@@ -365,8 +368,14 @@
autoEnlistInTx(me);
// Update persistence info
autoSetPersistent(me);
- // Throttle if needed
- throttle();
+ if (me.getRole().equals(Role.CONSUMER)
+ && me.getStatus().equals(ExchangeStatus.ACTIVE)) {
+ // Throttle if needed
+ // the throttle should happen when send messageexchange from
+ //consumer to provider, so avoid throttling for response me and
+ //Done me
+ throttle();
+ }
// Store the consumer component
if (me.getRole() == Role.CONSUMER) {
me.setSourceId(component.getComponentNameSpace());
Modified: servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java?rev=918354&r1=918353&r2=918354&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java Wed Mar 3 07:50:11 2010
@@ -41,6 +41,7 @@
import org.apache.servicemix.components.util.ComponentSupport;
import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
import org.apache.servicemix.jbi.jaxp.StringSource;
import static org.easymock.EasyMock.*;
@@ -204,6 +205,71 @@
assertEquals(ExchangeStatus.ERROR, exchange.getStatus());
}
+ public void testThrottle() throws Exception {
+ // Retrieve a delivery channel
+ TestComponent component = new TestComponent(new QName("service"), "endpoint");
+ container.activateComponent(new ActivationSpec("component", component));
+ final DeliveryChannel channel = component.getChannel();
+ // test
+ ComponentMBeanImpl componentMbeanImpl = container.getRegistry().getComponent("component");
+ assertNotNull(componentMbeanImpl);
+ componentMbeanImpl.setExchangeThrottling(true);
+ componentMbeanImpl.setThrottlingTimeout(4000);
+
+
+ class ProviderThread extends Thread {
+ private int counter;
+ private DeliveryChannel channel;
+ public ProviderThread(int counter, DeliveryChannel channel) {
+ this.counter = counter;
+ this.channel = channel;
+ }
+
+ public void run() {
+
+ try {
+ InOut me = (InOut) channel.accept(10000);
+ NormalizedMessage nm = me.createMessage();
+ nm.setContent(new StringSource("<response>" + counter
+ + "</response>"));
+ me.setOutMessage(nm);
+ channel.sendSync(me);
+
+ } catch (MessagingException e) {
+ LOG.error(e.getMessage(), e);
+
+ }
+ }
+
+ }
+
+
+ for (int i = 0; i < 6; i++) {
+
+ MessageExchangeFactory factory = channel.createExchangeFactoryForService(new QName("service"));
+ InOut me = factory.createInOutExchange();
+ NormalizedMessage nm = me.createMessage();
+ nm.setContent(new StringSource("<request>" + i + "</request>"));
+ me.setInMessage(nm);
+ Thread t = new ProviderThread(i, channel);
+ t.start();
+ long before = System.currentTimeMillis();
+ channel.sendSync(me, 5000);
+ long after = System.currentTimeMillis();
+ if (i % 2 == 1) {
+ // throttle sleep 4000ms for every 2 message, so
+ // the duration should > 4000ms
+ assertTrue(after - before > 4000);
+ } else {
+ assertTrue(after - before < 4000);
+ }
+ assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+ me.setStatus(ExchangeStatus.DONE);
+ channel.send(me);
+ t.join();
+ }
+ }
+
private MessageExchangeImpl createMessageExchange(final DeliveryChannelImpl channel) throws MessagingException {
MessageExchangeFactory factory = channel.createExchangeFactoryForService(new QName("service"));
return (MessageExchangeImpl) factory.createInOutExchange();