You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 22:27:38 UTC

svn commit: r753023 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/openwire/ test/java/org/apache/activemq/broker/openwire/

Author: chirino
Date: Thu Mar 12 21:27:38 2009
New Revision: 753023

URL: http://svn.apache.org/viewvc?rev=753023&view=rev
Log:
Flow control is working a little better now.  But still have some kind of timing issue.


Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 21:27:38 2009
@@ -21,10 +21,10 @@
     protected String name;
 
     private int priorityLevels;
-    protected final int outputWindowSize = 1000;
-    protected final int outputResumeThreshold = 900;
-    protected final int inputWindowSize = 1000;
-    protected final int inputResumeThreshold = 500;
+    protected int outputWindowSize = 1000;
+    protected int outputResumeThreshold = 900;
+    protected int inputWindowSize = 1000;
+    protected int inputResumeThreshold = 500;
     
     private IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
@@ -62,29 +62,27 @@
     }
     
     protected final void write(final Object o) {
-        synchronized (transport) {
-            if (blockingWriter==null) {
-                try {
-                    transport.oneway(o);
-                } catch (IOException e) {
-                    onException(e);
-                }
-            } else {
-                try {
-                    blockingWriter.execute(new Runnable() {
-                        public void run() {
-                            if (!stopping.get()) {
-                                try {
-                                    transport.oneway(o);
-                                } catch (IOException e) {
-                                    onException(e);
-                                }
+        if (blockingWriter==null) {
+            try {
+                transport.oneway(o);
+            } catch (IOException e) {
+                onException(e);
+            }
+        } else {
+            try {
+                blockingWriter.execute(new Runnable() {
+                    public void run() {
+                        if (!stopping.get()) {
+                            try {
+                                transport.oneway(o);
+                            } catch (IOException e) {
+                                onException(e);
                             }
                         }
-                    });
-                } catch (RejectedExecutionException re) {
-                    //Must be shutting down.
-                }
+                    }
+                });
+            } catch (RejectedExecutionException re) {
+                //Must be shutting down.
             }
         }
     }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 21:27:38 2009
@@ -141,6 +141,8 @@
                 }
 
                 public Response processMessageAck(MessageAck info) throws Exception {
+                    ConsumerContext ctx = consumers.get(info.getConsumerId());
+                    ctx.ack(info);
                     return ack(command);
                 }
 
@@ -374,6 +376,10 @@
             });
         }
 
+        public void ack(MessageAck info) {
+            limiter.onProtocolCredit(info.getMessageCount());
+        }
+
         public IFlowSink<MessageDelivery> getSink() {
             return queue;
         }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Mar 12 21:27:38 2009
@@ -117,9 +117,9 @@
     }
     
     public void test_10_10_10() throws Exception {
-        producerCount = 2;
-        destCount = 2;
-        consumerCount = 2;
+        producerCount = 1;
+        destCount = 1;
+        consumerCount = 1;
 
         createConnections();
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 21:27:38 2009
@@ -13,10 +13,13 @@
 import org.apache.activemq.broker.Router;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -25,7 +28,6 @@
 import org.apache.activemq.flow.IFlowSink;
 import org.apache.activemq.flow.IFlowSource;
 import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.metric.MetricAggregator;
 import org.apache.activemq.metric.MetricCounter;
@@ -48,12 +50,11 @@
     private FlowController<MessageDelivery> inboundController;
 
     private ActiveMQDestination activemqDestination;
-
     private ConnectionInfo connectionInfo;
-
     private SessionInfo sessionInfo;
-
     private ConsumerInfo consumerInfo;
+
+    private Message lastMessage;
     
     public void start() throws Exception {
         consumerRate.name("Consumer " + name + " Rate");
@@ -87,7 +88,12 @@
         
         // Setup the input processing..
         Flow flow = new Flow(name, false);
-        SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
+        WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) {
+            protected void sendCredit(int credit) {
+                MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);
+                write(ack);
+            }
+        };
         inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>() {
             public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
                 messageReceived(controller, elem);
@@ -111,6 +117,7 @@
                 System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
             } else if (command.getClass() == MessageDispatch.class) {
                 MessageDispatch msg = (MessageDispatch) command;
+                lastMessage = msg.getMessage();
                 inboundController.add(new OpenWireMessageDelivery(msg.getMessage()), null);
             } else {
                 onException(new Exception("Unrecognized command: " + command));
@@ -124,12 +131,10 @@
         if( schedualWait ) {
             if (thinkTime > 0) {
                 getDispatcher().schedule(new Runnable(){
-
                     public void run() {
                         consumerRate.increment();
                         controller.elementDispatched(elem);
                     }
-                    
                 }, thinkTime, TimeUnit.MILLISECONDS);
                 
             }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Thu Mar 12 21:27:38 2009
@@ -93,7 +93,7 @@
         sessionInfo = createSessionInfo(connectionInfo);
         transport.oneway(sessionInfo);
         producerInfo = createProducerInfo(sessionInfo);
-        producerInfo.setWindowSize(1024*4);
+        producerInfo.setWindowSize(outputWindowSize);
         transport.oneway(producerInfo);
         
         dispatchContext = getDispatcher().register(this, name + "-client");
@@ -101,11 +101,12 @@
     }
     
     protected void initialize() {
-        Flow ouboundFlow = new Flow(name, false);
-        outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
-        outboundQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+        Flow flow = new Flow(name, false);
+        outputResumeThreshold = outputWindowSize/2;
+        outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow, outputWindowSize, outputResumeThreshold);
+        outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound", outboundLimiter);
         
-        outboundController = outboundQueue.getFlowController(ouboundFlow);
+        outboundController = outboundQueue.getFlowController(flow);
         outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
             public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
                 Message msg = message.asType(Message.class);