You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ff...@apache.org on 2010/03/03 08:50:11 UTC

svn commit: r918354 - in /servicemix/smx3/trunk/core/servicemix-core/src: main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java

Author: ffang
Date: Wed Mar  3 07:50:11 2010
New Revision: 918354

URL: http://svn.apache.org/viewvc?rev=918354&view=rev
Log:
[SM-1937] Incorrect logic in throttle method of DeliveryChannelImpl

Modified:
    servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
    servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java

Modified: servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?rev=918354&r1=918353&r2=918354&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java Wed Mar  3 07:50:11 2010
@@ -338,16 +338,19 @@
 
     protected void throttle() {
         if (component.isExchangeThrottling()) {
-            if (component.getThrottlingInterval() > intervalCount) {
-                intervalCount = 0;
+            if (component.getThrottlingInterval() <= intervalCount) {
+                intervalCount = -1;
                 try {
-                    Thread.sleep(component.getThrottlingTimeout());
+                    long timeout = component.getThrottlingTimeout();
+                    LOG.debug("throttling, sleep for: " + timeout);
+                    Thread.sleep(timeout);
                 } catch (InterruptedException e) {
                     LOG.warn("throttling failed", e);
                 }
             }
             intervalCount++;
         }
+
     }
 
     protected void doSend(MessageExchangeImpl me, boolean sync) throws MessagingException {
@@ -365,8 +368,14 @@
             autoEnlistInTx(me);
             // Update persistence info
             autoSetPersistent(me);
-            // Throttle if needed
-            throttle();
+            if (me.getRole().equals(Role.CONSUMER) 
+                    && me.getStatus().equals(ExchangeStatus.ACTIVE)) {
+                // Throttle if needed
+                // the throttle should happen when send messageexchange from
+                //consumer to provider, so avoid throttling for response me and
+                //Done me
+                throttle();
+            }
             // Store the consumer component
             if (me.getRole() == Role.CONSUMER) {
                 me.setSourceId(component.getComponentNameSpace());

Modified: servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java
URL: http://svn.apache.org/viewvc/servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java?rev=918354&r1=918353&r2=918354&view=diff
==============================================================================
--- servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java (original)
+++ servicemix/smx3/trunk/core/servicemix-core/src/test/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImplTest.java Wed Mar  3 07:50:11 2010
@@ -41,6 +41,7 @@
 import org.apache.servicemix.components.util.ComponentSupport;
 import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
 import org.apache.servicemix.jbi.jaxp.StringSource;
 
 import static org.easymock.EasyMock.*;
@@ -204,6 +205,71 @@
         assertEquals(ExchangeStatus.ERROR, exchange.getStatus());
     }
 
+    public void testThrottle() throws Exception {
+     // Retrieve a delivery channel
+        TestComponent component = new TestComponent(new QName("service"), "endpoint");
+        container.activateComponent(new ActivationSpec("component", component));
+        final DeliveryChannel channel = component.getChannel();
+        // test
+        ComponentMBeanImpl componentMbeanImpl = container.getRegistry().getComponent("component");
+        assertNotNull(componentMbeanImpl);
+        componentMbeanImpl.setExchangeThrottling(true);
+        componentMbeanImpl.setThrottlingTimeout(4000);
+        
+
+        class ProviderThread extends Thread {
+            private int counter;
+            private DeliveryChannel channel;
+            public ProviderThread(int counter, DeliveryChannel channel) {
+                this.counter = counter;
+                this.channel = channel;
+            }
+
+            public void run() {
+
+                try {
+                    InOut me = (InOut) channel.accept(10000);
+                    NormalizedMessage nm = me.createMessage();
+                    nm.setContent(new StringSource("<response>" + counter
+                            + "</response>"));
+                    me.setOutMessage(nm);
+                    channel.sendSync(me);
+
+                } catch (MessagingException e) {
+                    LOG.error(e.getMessage(), e);
+
+                }
+            }
+            
+        }
+        
+        
+        for (int i = 0; i < 6; i++) {
+           
+            MessageExchangeFactory factory = channel.createExchangeFactoryForService(new QName("service"));
+            InOut me = factory.createInOutExchange();
+            NormalizedMessage nm = me.createMessage();
+            nm.setContent(new StringSource("<request>" + i + "</request>"));
+            me.setInMessage(nm);
+            Thread t = new ProviderThread(i, channel);
+            t.start();
+            long before = System.currentTimeMillis();
+            channel.sendSync(me, 5000);
+            long after = System.currentTimeMillis();
+            if (i % 2 == 1) {
+                // throttle sleep 4000ms for every 2 message, so
+                // the duration should > 4000ms
+                assertTrue(after - before > 4000);
+            } else {
+                assertTrue(after - before < 4000);
+            }
+            assertEquals(ExchangeStatus.ACTIVE, me.getStatus());
+            me.setStatus(ExchangeStatus.DONE);
+            channel.send(me); 
+            t.join();
+        }
+    }
+    
     private MessageExchangeImpl createMessageExchange(final DeliveryChannelImpl channel) throws MessagingException {
         MessageExchangeFactory factory = channel.createExchangeFactoryForService(new QName("service"));    
         return (MessageExchangeImpl) factory.createInOutExchange();