You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/13 05:34:37 UTC
svn commit: r743980 - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/
Author: chirino
Date: Fri Feb 13 04:34:36 2009
New Revision: 743980
URL: http://svn.apache.org/viewvc?rev=743980&view=rev
Log:
Cleaned up the tests a little, and fixed a deadlock situation
Added:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Fri Feb 13 04:34:36 2009
@@ -251,6 +251,7 @@
* the source flow controller.
*/
public boolean offer(E elem, ISourceController<E> sourceController) {
+ boolean ok = false;
synchronized (mutex) {
// If we don't have an fc sink, then just increment the limiter.
if (controllable == null) {
@@ -263,14 +264,16 @@
blockSource(sourceController);
setUnThrottleListener();
}
- controllable.flowElemAccepted(this, elem);
- return true;
+ ok = true;
} else {
blockSource(sourceController);
setUnThrottleListener();
- return false;
}
}
+ if( ok ) {
+ controllable.flowElemAccepted(this, elem);
+ }
+ return ok;
}
private boolean okToAdd(E elem) {
Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java?rev=743980&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java Fri Feb 13 04:34:36 2009
@@ -0,0 +1,62 @@
+/**
+ *
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+
+class BrokerConnection extends AbstractTestConnection implements MockBroker.DeliveryTarget {
+ private final Pipe<Message> pipe;
+ private final MockBroker local;
+
+ BrokerConnection(MockBroker local, MockBroker remote, Pipe<Message> pipe) {
+ super(local, remote.getName(), null, pipe);
+ this.pipe = pipe;
+ this.local = local;
+ }
+
+ @Override
+ protected Message getNextMessage() throws InterruptedException {
+ return pipe.read();
+ }
+
+ @Override
+ protected void addReadReadyListener(final ReadReadyListener listener) {
+ pipe.setReadReadyListener(new Pipe.ReadReadyListener<Message>() {
+ public void onReadReady(Pipe<Message> pipe) {
+ listener.onReadReady();
+ }
+ });
+ }
+
+ public Message pollNextMessage() {
+ return pipe.poll();
+ }
+
+ @Override
+ protected void messageReceived(Message m, ISourceController<Message> controller) {
+
+ m = new Message(m);
+ m.hopCount++;
+
+ local.router.route(controller, m);
+ }
+
+ @Override
+ protected void write(Message m, ISourceController<Message> controller) throws InterruptedException {
+ pipe.write(m);
+ }
+
+ public IFlowSink<Message> getSink() {
+ return output;
+ }
+
+ public boolean match(Message message) {
+ // Avoid loops:
+ if (message.hopCount > 0) {
+ return false;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Fri Feb 13 04:34:36 2009
@@ -9,8 +9,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
@@ -18,6 +16,12 @@
class MockBroker implements TransportAcceptListener {
+ public interface DeliveryTarget {
+ public IFlowSink<Message> getSink();
+
+ public boolean match(Message message);
+ }
+
final Router router= new Router();
final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 13 04:34:36 2009
@@ -66,77 +66,14 @@
MockBroker rcvBroker;
private ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
IDispatcher dispatcher;
-
- public interface DeliveryTarget {
- public IFlowSink<Message> getSink();
-
- public boolean match(Message message);
- }
-
final AtomicLong msgIdGenerator = new AtomicLong();
- class BrokerConnection extends AbstractTestConnection implements DeliveryTarget {
- private final Pipe<Message> pipe;
- private final MockBroker local;
-
- BrokerConnection(MockBroker local, MockBroker remote, Pipe<Message> pipe) {
- super(local, remote.getName(), null, pipe);
- this.pipe = pipe;
- this.local = local;
- }
-
- @Override
- protected Message getNextMessage() throws InterruptedException {
- return pipe.read();
- }
-
- @Override
- protected void addReadReadyListener(final ReadReadyListener listener) {
- pipe.setReadReadyListener(new Pipe.ReadReadyListener<Message>() {
- public void onReadReady(Pipe<Message> pipe) {
- listener.onReadReady();
- }
- });
- }
-
- public Message pollNextMessage() {
- return pipe.poll();
- }
-
- @Override
- protected void messageReceived(Message m, ISourceController<Message> controller) {
-
- m = new Message(m);
- m.hopCount++;
-
- local.router.route(controller, m);
- }
-
- @Override
- protected void write(Message m, ISourceController<Message> controller) throws InterruptedException {
- pipe.write(m);
- }
-
- public IFlowSink<Message> getSink() {
- return output;
- }
-
- public boolean match(Message message) {
- // Avoid loops:
- if (message.hopCount > 0) {
- return false;
- }
-
- return true;
- }
- }
-
- final Mapper<Long, Message> keyExtractor = new Mapper<Long, Message>() {
+ static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
public Long map(Message element) {
return element.getMsgId();
}
};
- final Mapper<Integer, Message> partitionMapper = new Mapper<Integer, Message>() {
+ static public final Mapper<Integer, Message> PARTITION_MAPPER = new Mapper<Integer, Message>() {
public Integer map(Message element) {
// we modulo 10 to have at most 10 partitions which the producers
// gets split across.
@@ -144,96 +81,6 @@
}
};
- private void reportRates() throws InterruptedException {
- System.out.println("Checking rates for test: " + getName());
- for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
- Period p = new Period();
- Thread.sleep(1000 * 5);
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
- totalProducerRate.reset();
- totalConsumerRate.reset();
- }
- }
-
- /**
- * Test sending with 1 high priority sender. The high priority sender should
- * have higher throughput than the other low priority senders.
- *
- * @throws Exception
- */
- public void test_2_1_1_HighPriorityProducer() throws Exception {
-
- producerCount = 2;
- destCount = 1;
- consumerCount = 1;
-
- createConnections();
- RemoteProducer producer = sendBroker.producers.get(0);
- producer.setPriority(1);
- producer.getRate().setName("High Priority Producer Rate");
-
- rcvBroker.consumers.get(0).setThinkTime(1);
-
- // Start 'em up.
- startServices();
- try {
-
- System.out.println("Checking rates for test: " + getName());
- for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
- Period p = new Period();
- Thread.sleep(1000 * 5);
- System.out.println(producer.getRate().getRateSummary(p));
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
- totalProducerRate.reset();
- totalConsumerRate.reset();
- }
-
- } finally {
- stopServices();
- }
- }
-
- /**
- * Test sending with 1 high priority sender. The high priority sender should
- * have higher throughput than the other low priority senders.
- *
- * @throws Exception
- */
- public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
- producerCount = 2;
- destCount = 1;
- consumerCount = 1;
-
- createConnections();
- RemoteProducer producer = sendBroker.producers.get(0);
- producer.setPriority(1);
- producer.setPriorityMod(3);
- producer.getRate().setName("High Priority Producer Rate");
-
- rcvBroker.consumers.get(0).setThinkTime(1);
-
- // Start 'em up.
- startServices();
- try {
-
- System.out.println("Checking rates for test: " + getName());
- for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
- Period p = new Period();
- Thread.sleep(1000 * 5);
- System.out.println(producer.getRate().getRateSummary(p));
- System.out.println(totalProducerRate.getRateSummary(p));
- System.out.println(totalConsumerRate.getRateSummary(p));
- totalProducerRate.reset();
- totalConsumerRate.reset();
- }
-
- } finally {
- stopServices();
- }
- }
-
public void test_1_1_1() throws Exception {
producerCount = 1;
destCount = 1;
@@ -377,6 +224,96 @@
}
}
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = sendBroker.producers.get(0);
+ producer.setPriority(1);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ rcvBroker.consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ /**
+ * Test sending with 1 high priority sender. The high priority sender should
+ * have higher throughput than the other low priority senders.
+ *
+ * @throws Exception
+ */
+ public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+ producerCount = 2;
+ destCount = 1;
+ consumerCount = 1;
+
+ createConnections();
+ RemoteProducer producer = sendBroker.producers.get(0);
+ producer.setPriority(1);
+ producer.setPriorityMod(3);
+ producer.getRate().setName("High Priority Producer Rate");
+
+ rcvBroker.consumers.get(0).setThinkTime(1);
+
+ // Start 'em up.
+ startServices();
+ try {
+
+ System.out.println("Checking rates for test: " + getName());
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(producer.getRate().getRateSummary(p));
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+
+ } finally {
+ stopServices();
+ }
+ }
+
+ private void reportRates() throws InterruptedException {
+ System.out.println("Checking rates for test: " + getName()+", "+(ptp?"ptp":"topic"));
+ for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+ Period p = new Period();
+ Thread.sleep(1000 * 5);
+ System.out.println(totalProducerRate.getRateSummary(p));
+ System.out.println(totalConsumerRate.getRateSummary(p));
+ totalProducerRate.reset();
+ totalConsumerRate.reset();
+ }
+ }
+
private void createConnections() throws IOException, URISyntaxException {
if (DISPATCH_MODE == AbstractTestConnection.ASYNC || DISPATCH_MODE == AbstractTestConnection.POLLING) {
@@ -462,9 +399,9 @@
MockQueue queue = new MockQueue();
queue.setBroker(broker);
queue.setDestination(destination);
- queue.setKeyExtractor(keyExtractor);
+ queue.setKeyExtractor(KEY_MAPPER);
if( usePartitionedQueue ) {
- queue.setPartitionMapper(partitionMapper);
+ queue.setPartitionMapper(PARTITION_MAPPER);
}
return queue;
}
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Fri Feb 13 04:34:36 2009
@@ -5,7 +5,7 @@
import java.util.HashMap;
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.IQueue;
import org.apache.activemq.queue.Mapper;
import org.apache.activemq.queue.PartitionedQueue;
@@ -13,7 +13,7 @@
import org.apache.activemq.queue.SharedQueue;
import org.apache.activemq.queue.Subscription;
-class MockQueue implements MockBrokerTest.DeliveryTarget {
+class MockQueue implements MockBroker.DeliveryTarget {
HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget, Subscription<Message>>();
private Destination destination;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Fri Feb 13 04:34:36 2009
@@ -7,7 +7,7 @@
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
import org.apache.activemq.queue.ExclusivePriorityQueue;
import org.apache.activemq.queue.ExclusiveQueue;
import org.apache.activemq.queue.IFlowQueue;
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Fri Feb 13 04:34:36 2009
@@ -7,7 +7,7 @@
import java.util.Collection;
import java.util.HashMap;
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
public class Router {
final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();