You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/12 23:40:15 UTC
svn commit: r753041 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/openwire/
test/java/org/apache/activemq/broker/openwire/
Author: chirino
Date: Thu Mar 12 22:40:14 2009
New Revision: 753041
URL: http://svn.apache.org/viewvc?rev=753041&view=rev
Log:
Got all the flow control bits working now.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/Connection.java Thu Mar 12 22:40:14 2009
@@ -88,16 +88,14 @@
}
public void onException(IOException error) {
- onException((Exception) error);
- }
-
- public void onException(Exception error) {
if (!isStopping()) {
- System.out.println("RemoteConnection error: " + error);
- error.printStackTrace();
+ onException((Exception) error);
}
}
+ public void onException(Exception error) {
+ }
+
public boolean isStopping(){
return stopping.get();
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java Thu Mar 12 22:40:14 2009
@@ -356,7 +356,7 @@
selector = parseSelector(info);
Flow flow = new Flow("broker-"+name+"-outbound", false);
- limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize() / 2) {
+ limiter = new WindowLimiter<MessageDelivery>(true, flow, info.getPrefetchSize(), info.getPrefetchSize()/2) {
public int getElementSize(MessageDelivery m) {
return 1;
}
@@ -375,7 +375,9 @@
}
public void ack(MessageAck info) {
- limiter.onProtocolCredit(info.getMessageCount());
+ synchronized(queue) {
+ limiter.onProtocolCredit(info.getMessageCount());
+ }
}
public IFlowSink<MessageDelivery> getSink() {
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireBrokerTest.java Thu Mar 12 22:40:14 2009
@@ -20,6 +20,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
@@ -38,7 +39,7 @@
public class OpenwireBrokerTest extends TestCase {
- protected static final int PERFORMANCE_SAMPLES = 3000000;
+ protected static final int PERFORMANCE_SAMPLES = 3;
protected static final int IO_WORK_AMOUNT = 0;
protected static final int FANIN_COUNT = 10;
@@ -77,7 +78,8 @@
protected ArrayList<Broker> brokers = new ArrayList<Broker>();
protected IDispatcher dispatcher;
protected final AtomicLong msgIdGenerator = new AtomicLong();
-
+ protected final AtomicBoolean stopping = new AtomicBoolean();
+
final ArrayList<RemoteProducer> producers = new ArrayList<RemoteProducer>();
final ArrayList<RemoteConsumer> consumers = new ArrayList<RemoteConsumer>();
@@ -116,22 +118,6 @@
return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Broker.MAX_PRIORITY, asyncThreadPoolSize);
}
- public void test_10_10_10() throws Exception {
- producerCount = 1;
- destCount = 1;
- consumerCount = 1;
-
- createConnections();
-
- // Start 'em up.
- startServices();
- try {
- reportRates();
- } finally {
- stopServices();
- }
- }
-
public void test_1_1_0() throws Exception {
producerCount = 1;
destCount = 1;
@@ -227,6 +213,23 @@
}
}
+ public void test_10_10_10() throws Exception {
+ producerCount = 10;
+ destCount = 10;
+ consumerCount = 10;
+
+ createConnections();
+
+ // Start 'em up.
+ startServices();
+ try {
+ reportRates();
+ } finally {
+ stopServices();
+ }
+ }
+
+
/**
* Tests 2 producers sending to 1 destination with 2 consumres, but with
* consumers set to select only messages from each producer. 1 consumers is
@@ -414,7 +417,14 @@
}
private RemoteConsumer createConsumer(int i, Destination destination) throws URISyntaxException {
- RemoteConsumer consumer = new RemoteConsumer();
+ RemoteConsumer consumer = new RemoteConsumer() {
+ public void onException(Exception error) {
+ if( !stopping.get() ) {
+ System.err.println("Consumer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ };
consumer.setUri(new URI(rcvBroker.getUri()));
consumer.setDestination(destination);
consumer.setName("consumer" + (i + 1));
@@ -424,7 +434,14 @@
}
private RemoteProducer createProducer(int id, Destination destination) throws URISyntaxException {
- RemoteProducer producer = new RemoteProducer();
+ RemoteProducer producer = new RemoteProducer() {
+ public void onException(Exception error) {
+ if( !stopping.get() ) {
+ System.err.println("Producer Async Error:");
+ error.printStackTrace();
+ }
+ }
+ };
producer.setUri(new URI(sendBroker.getUri()));
producer.setProducerId(id + 1);
producer.setName("producer" + (id + 1));
@@ -455,15 +472,16 @@
}
private void stopServices() throws Exception {
+ stopping.set(true);
+ for (Broker broker : brokers) {
+ broker.stop();
+ }
for (RemoteProducer connection : producers) {
connection.stop();
}
for (RemoteConsumer connection : consumers) {
connection.stop();
}
- for (Broker broker : brokers) {
- broker.stop();
- }
if (dispatcher != null) {
dispatcher.shutdown();
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753041&r1=753040&r2=753041&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java Thu Mar 12 22:40:14 2009
@@ -78,7 +78,7 @@
sessionInfo = createSessionInfo(connectionInfo);
transport.oneway(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, activemqDestination);
- consumerInfo.setPrefetchSize(1000);
+ consumerInfo.setPrefetchSize(inputWindowSize);
transport.oneway(consumerInfo);
}
@@ -87,6 +87,7 @@
// Setup the input processing..
final Flow flow = new Flow("client-"+name+"-inbound", false);
+ inputResumeThreshold = inputWindowSize/2;
WindowLimiter<MessageDelivery> limiter = new WindowLimiter<MessageDelivery>(false, flow, inputWindowSize, inputResumeThreshold) {
protected void sendCredit(int credit) {
MessageAck ack = OpenwireSupport.createAck(consumerInfo, lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);