You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 22:27:38 UTC
svn commit: r753023 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/openwire/
test/java/org/apache/activemq/broker/openwire/
Author: chirino
Date: Thu Mar 12 21:27:38 2009
New Revision: 753023
URL: http://svn.apache.org/viewvc?rev=753023&view=rev
Log:
Flow control is working a little better now. But still have some kind of timing issue.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 21:27:38 2009
@@ -21,10 +21,10 @@
protected String name;
private int priorityLevels;
- protected final int outputWindowSize = 1000;
- protected final int outputResumeThreshold = 900;
- protected final int inputWindowSize = 1000;
- protected final int inputResumeThreshold = 500;
+ protected int outputWindowSize = 1000;
+ protected int outputResumeThreshold = 900;
+ protected int inputWindowSize = 1000;
+ protected int inputResumeThreshold = 500;
private IDispatcher dispatcher;
private final AtomicBoolean stopping = new AtomicBoolean();
@@ -62,29 +62,27 @@
}
protected final void write(final Object o) {
- synchronized (transport) {
- if (blockingWriter==null) {
- try {
- transport.oneway(o);
- } catch (IOException e) {
- onException(e);
- }
- } else {
- try {
- blockingWriter.execute(new Runnable() {
- public void run() {
- if (!stopping.get()) {
- try {
- transport.oneway(o);
- } catch (IOException e) {
- onException(e);
- }
+ if (blockingWriter==null) {
+ try {
+ transport.oneway(o);
+ } catch (IOException e) {
+ onException(e);
+ }
+ } else {
+ try {
+ blockingWriter.execute(new Runnable() {
+ public void run() {
+ if (!stopping.get()) {
+ try {
+ transport.oneway(o);
+ } catch (IOException e) {
+ onException(e);
}
}
- });
- } catch (RejectedExecutionException re) {
- //Must be shutting down.
- }
+ }
+ });
+ } catch (RejectedExecutionException re) {
+ //Must be shutting down.
}
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 21:27:38 2009
@@ -141,6 +141,8 @@
}
public Response processMessageAck(MessageAck info) throws Exception {
+ ConsumerContext ctx = consumers.get(info.getConsumerId());
+ ctx.ack(info);
return ack(command);
}
@@ -374,6 +376,10 @@
});
}
+ public void ack(MessageAck info) {
+ limiter.onProtocolCredit(info.getMessageCount());
+ }
+
public IFlowSink<MessageDelivery> getSink() {
return queue;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Mar 12 21:27:38 2009
@@ -117,9 +117,9 @@
}
public void test_10_10_10() throws Exception {
- producerCount = 2;
- destCount = 2;
- consumerCount = 2;
+ producerCount = 1;
+ destCount = 1;
+ consumerCount = 1;
createConnections();
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 21:27:38 2009
@@ -13,10 +13,13 @@
import org.apache.activemq.broker.Router;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.WireFormatInfo;
@@ -25,7 +28,6 @@
import org.apache.activemq.flow.IFlowSink;
import org.apache.activemq.flow.IFlowSource;
import org.apache.activemq.flow.ISourceController;
-import org.apache.activemq.flow.SizeLimiter;
import org.apache.activemq.flow.ISinkController.FlowControllable;
import org.apache.activemq.metric.MetricAggregator;
import org.apache.activemq.metric.MetricCounter;
@@ -48,12 +50,11 @@
private FlowController<MessageDelivery> inboundController;
private ActiveMQDestination activemqDestination;
-
private ConnectionInfo connectionInfo;
-
private SessionInfo sessionInfo;
-
private ConsumerInfo consumerInfo;
+
+ private Message lastMessage;
public void start() throws Exception {
consumerRate.name("Consumer " + name + " Rate");
@@ -87,7 +88,12 @@
// Setup the input processing..
Flow flow = new Flow(name, false);
- SizeLimiter<MessageDelivery> limiter = new SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
+ WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) {
+ protected void sendCredit(int credit) {
+ MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);
+ write(ack);
+ }
+ };
inboundController = new FlowController<MessageDelivery>(new FlowControllable<MessageDelivery>() {
public void flowElemAccepted(ISourceController<MessageDelivery> controller, MessageDelivery elem) {
messageReceived(controller, elem);
@@ -111,6 +117,7 @@
System.out.println("Consumer "+name+" connected to "+((BrokerInfo)command).getBrokerName());
} else if (command.getClass() == MessageDispatch.class) {
MessageDispatch msg = (MessageDispatch) command;
+ lastMessage = msg.getMessage();
inboundController.add(new OpenWireMessageDelivery(msg.getMessage()), null);
} else {
onException(new Exception("Unrecognized command: " + command));
@@ -124,12 +131,10 @@
if( schedualWait ) {
if (thinkTime > 0) {
getDispatcher().schedule(new Runnable(){
-
public void run() {
consumerRate.increment();
controller.elementDispatched(elem);
}
-
}, thinkTime, TimeUnit.MILLISECONDS);
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753023&r1=753022&r2=753023&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java Thu Mar 12 21:27:38 2009
@@ -93,7 +93,7 @@
sessionInfo = createSessionInfo(connectionInfo);
transport.oneway(sessionInfo);
producerInfo = createProducerInfo(sessionInfo);
- producerInfo.setWindowSize(1024*4);
+ producerInfo.setWindowSize(outputWindowSize);
transport.oneway(producerInfo);
dispatchContext = getDispatcher().register(this, name + "-client");
@@ -101,11 +101,12 @@
}
protected void initialize() {
- Flow ouboundFlow = new Flow(name, false);
- outboundLimiter = new WindowLimiter<MessageDelivery>(true, ouboundFlow, outputWindowSize, outputResumeThreshold);
- outboundQueue = new SingleFlowRelay<MessageDelivery>(ouboundFlow, name + "-outbound", outboundLimiter);
+ Flow flow = new Flow(name, false);
+ outputResumeThreshold = outputWindowSize/2;
+ outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow, outputWindowSize, outputResumeThreshold);
+ outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, name + "-outbound", outboundLimiter);
- outboundController = outboundQueue.getFlowController(ouboundFlow);
+ outboundController = outboundQueue.getFlowController(flow);
outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {
public void drain(MessageDelivery message, ISourceController<MessageDelivery> controller) {
Message msg = message.asType(Message.class);