You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 23:40:15 UTC

svn commit: r753041 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/openwire/

Author: chirino
Date: Thu Mar 12 22:40:14 2009
New Revision: 753041

URL: http://svn.apache.org/viewvc?rev=753041&view=rev
Log:
Got all the flow control bits working now.


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 22:40:14 2009
@@ -88,16 +88,14 @@
     }
     
     public void onException(IOException error) {
-        onException((Exception) error);
-    }
-
-    public void onException(Exception error) {
         if (!isStopping()) {
-            System.out.println("RemoteConnection error: " + error);
-            error.printStackTrace();
+            onException((Exception) error);
         }
     }
 
+    public void onException(Exception error) {
+    }
+    
     public boolean isStopping(){ 
         return stopping.get();
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 22:40:14 2009
@@ -356,7 +356,7 @@
             selector = parseSelector(info);
 
             Flow flow = new Flow("broker-"+name+"-outbound", false);
-            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+            limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) {
                 public int getElementSize(MessageDelivery m) {
                     return 1;
                 }
@@ -375,7 +375,9 @@
         }
 
         public void ack(MessageAck info) {
-            limiter.onProtocolCredit(info.getMessageCount());
+            synchronized(queue) {
+                limiter.onProtocolCredit(info.getMessageCount());
+            }
         }
 
         public IFlowSink<MessageDelivery> getSink() {

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Mar 12 22:40:14 2009
@@ -20,6 +20,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import junit.framework.TestCase;
@@ -38,7 +39,7 @@
 
 public class OpenwireBrokerTest extends TestCase {
 
-    protected static final int PERFORMANCE_SAMPLES = 3000000;
+    protected static final int PERFORMANCE_SAMPLES = 3;
 
     protected static final int IO_WORK_AMOUNT = 0;
     protected static final int FANIN_COUNT = 10;
@@ -77,7 +78,8 @@
     protected ArrayList<Broker> brokers = new ArrayList<Broker>();
     protected IDispatcher dispatcher;
     protected final AtomicLong msgIdGenerator = new AtomicLong();
-
+    protected final AtomicBoolean stopping = new AtomicBoolean();
+    
     final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
     final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
 
@@ -116,22 +118,6 @@
         return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
     }
     
-    public void test_10_10_10() throws Exception {
-        producerCount = 1;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-
-        // Start 'em up.
-        startServices();
-        try {
-            reportRates();
-        } finally {
-            stopServices();
-        }
-    }
-
     public void test_1_1_0() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -227,6 +213,23 @@
         }
     }
 
+    public void test_10_10_10() throws Exception {
+        producerCount = 10;
+        destCount = 10;
+        consumerCount = 10;
+
+        createConnections();
+
+        // Start 'em up.
+        startServices();
+        try {
+            reportRates();
+        } finally {
+            stopServices();
+        }
+    }
+
+
     /**
      * Tests 2 producers sending to 1 destination with 2 consumres, but with
      * consumers set to select only messages from each producer. 1 consumers is
@@ -414,7 +417,14 @@
     }
 
     private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
-        RemoteConsumer consumer = new RemoteConsumer();
+        RemoteConsumer consumer = new RemoteConsumer() {
+            public void onException(Exception error) {
+                if( !stopping.get() ) {
+                    System.err.println("Consumer Async Error:");
+                    error.printStackTrace();
+                }
+            }
+        };
         consumer.setUri(new URI(rcvBroker.getUri()));
         consumer.setDestination(destination);
         consumer.setName("consumer" + (i + 1));
@@ -424,7 +434,14 @@
     }
 
     private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
-        RemoteProducer producer = new RemoteProducer();
+        RemoteProducer producer = new RemoteProducer() {
+            public void onException(Exception error) {
+                if( !stopping.get() ) {
+                    System.err.println("Producer Async Error:");
+                    error.printStackTrace();
+                }
+            }
+        };
         producer.setUri(new URI(sendBroker.getUri()));
         producer.setProducerId(id + 1);
         producer.setName("producer" + (id + 1));
@@ -455,15 +472,16 @@
     }
 
     private void stopServices() throws Exception {
+        stopping.set(true);
+        for (Broker broker : brokers) {
+            broker.stop();
+        }
         for (RemoteProducer connection : producers) {
             connection.stop();
         }
         for (RemoteConsumer connection : consumers) {
             connection.stop();
         }
-        for (Broker broker : brokers) {
-            broker.stop();
-        }
         if (dispatcher != null) {
             dispatcher.shutdown();
         }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 22:40:14 2009
@@ -78,7 +78,7 @@
         sessionInfo = createSessionInfo(connectionInfo);
         transport.oneway(sessionInfo);
         consumerInfo = createConsumerInfo(sessionInfo, activemqDestination);
-        consumerInfo.setPrefetchSize(1000);
+        consumerInfo.setPrefetchSize(inputWindowSize);
         transport.oneway(consumerInfo);
         
     }
@@ -87,6 +87,7 @@
         
         // Setup the input processing..
         final Flow flow = new Flow("client-"+name+"-inbound", false);
+        inputResumeThreshold = inputWindowSize/2;
         WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) {
             protected void sendCredit(int credit) {
                 MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);