You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/13 05:34:37 UTC

svn commit: r743980 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/

Author: chirino
Date: Fri Feb 13 04:34:36 2009
New Revision: 743980

URL: http://svn.apache.org/viewvc?rev=743980&view=rev
Log:
Cleaned up the tests a little, and fixed a deadlock situation

Added:
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Fri Feb 13 04:34:36 2009
@@ -251,6 +251,7 @@
      *            the source flow controller.
      */
     public boolean offer(E elem, ISourceController<E> sourceController) {
+        boolean ok = false;
         synchronized (mutex) {
             // If we don't have an fc sink, then just increment the limiter.
             if (controllable == null) {
@@ -263,14 +264,16 @@
                     blockSource(sourceController);
                     setUnThrottleListener();
                 }
-                controllable.flowElemAccepted(this, elem);
-                return true;
+                ok = true;
             } else {
                 blockSource(sourceController);
                 setUnThrottleListener();
-                return false;
             }
         }
+        if( ok ) {
+            controllable.flowElemAccepted(this, elem);
+        }
+        return ok;
     }
 
     private boolean okToAdd(E elem) {

Added: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java?rev=743980&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java (added)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/BrokerConnection.java Fri Feb 13 04:34:36 2009
@@ -0,0 +1,62 @@
+/**
+ * 
+ */
+package org.apache.activemq.flow;
+
+import org.apache.activemq.flow.AbstractTestConnection.ReadReadyListener;
+
+class BrokerConnection extends AbstractTestConnection implements MockBroker.DeliveryTarget {
+    private final Pipe<Message> pipe;
+    private final MockBroker local;
+
+    BrokerConnection(MockBroker local, MockBroker remote, Pipe<Message> pipe) {
+        super(local, remote.getName(), null, pipe);
+        this.pipe = pipe;
+        this.local = local;
+    }
+
+    @Override
+    protected Message getNextMessage() throws InterruptedException {
+        return pipe.read();
+    }
+
+    @Override
+    protected void addReadReadyListener(final ReadReadyListener listener) {
+        pipe.setReadReadyListener(new Pipe.ReadReadyListener<Message>() {
+            public void onReadReady(Pipe<Message> pipe) {
+                listener.onReadReady();
+            }
+        });
+    }
+
+    public Message pollNextMessage() {
+        return pipe.poll();
+    }
+
+    @Override
+    protected void messageReceived(Message m, ISourceController<Message> controller) {
+
+        m = new Message(m);
+        m.hopCount++;
+
+        local.router.route(controller, m);
+    }
+
+    @Override
+    protected void write(Message m, ISourceController<Message> controller) throws InterruptedException {
+        pipe.write(m);
+    }
+
+    public IFlowSink<Message> getSink() {
+        return output;
+    }
+
+    public boolean match(Message message) {
+        // Avoid loops:
+        if (message.hopCount > 0) {
+            return false;
+        }
+
+        return true;
+    }
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBroker.java Fri Feb 13 04:34:36 2009
@@ -9,8 +9,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.IDispatcher;
-import org.apache.activemq.flow.MockBrokerTest.BrokerConnection;
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
@@ -18,6 +16,12 @@
 
 class MockBroker implements TransportAcceptListener {
 
+    public interface DeliveryTarget {
+        public IFlowSink<Message> getSink();
+
+        public boolean match(Message message);
+    }
+
     final Router router=  new Router();
     
     final ArrayList<RemoteConnection> connections = new ArrayList<RemoteConnection>();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 13 04:34:36 2009
@@ -66,77 +66,14 @@
     MockBroker rcvBroker;
     private ArrayList<MockBroker> brokers = new ArrayList<MockBroker>();
     IDispatcher dispatcher;
-
-    public interface DeliveryTarget {
-        public IFlowSink<Message> getSink();
-
-        public boolean match(Message message);
-    }
-
     final AtomicLong msgIdGenerator = new AtomicLong();
 
-    class BrokerConnection extends AbstractTestConnection implements DeliveryTarget {
-        private final Pipe<Message> pipe;
-        private final MockBroker local;
-
-        BrokerConnection(MockBroker local, MockBroker remote, Pipe<Message> pipe) {
-            super(local, remote.getName(), null, pipe);
-            this.pipe = pipe;
-            this.local = local;
-        }
-
-        @Override
-        protected Message getNextMessage() throws InterruptedException {
-            return pipe.read();
-        }
-
-        @Override
-        protected void addReadReadyListener(final ReadReadyListener listener) {
-            pipe.setReadReadyListener(new Pipe.ReadReadyListener<Message>() {
-                public void onReadReady(Pipe<Message> pipe) {
-                    listener.onReadReady();
-                }
-            });
-        }
-
-        public Message pollNextMessage() {
-            return pipe.poll();
-        }
-
-        @Override
-        protected void messageReceived(Message m, ISourceController<Message> controller) {
-
-            m = new Message(m);
-            m.hopCount++;
-
-            local.router.route(controller, m);
-        }
-
-        @Override
-        protected void write(Message m, ISourceController<Message> controller) throws InterruptedException {
-            pipe.write(m);
-        }
-
-        public IFlowSink<Message> getSink() {
-            return output;
-        }
-
-        public boolean match(Message message) {
-            // Avoid loops:
-            if (message.hopCount > 0) {
-                return false;
-            }
-
-            return true;
-        }
-    }
-
-    final Mapper<Long, Message> keyExtractor = new Mapper<Long, Message>() {
+    static public final Mapper<Long, Message> KEY_MAPPER = new Mapper<Long, Message>() {
         public Long map(Message element) {
             return element.getMsgId();
         }
     };
-    final Mapper<Integer, Message> partitionMapper = new Mapper<Integer, Message>() {
+    static public final Mapper<Integer, Message> PARTITION_MAPPER = new Mapper<Integer, Message>() {
         public Integer map(Message element) {
             // we modulo 10 to have at most 10 partitions which the producers
             // gets split across.
@@ -144,96 +81,6 @@
         }
     };
 
-    private void reportRates() throws InterruptedException {
-        System.out.println("Checking rates for test: " + getName());
-        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-            Period p = new Period();
-            Thread.sleep(1000 * 5);
-            System.out.println(totalProducerRate.getRateSummary(p));
-            System.out.println(totalConsumerRate.getRateSummary(p));
-            totalProducerRate.reset();
-            totalConsumerRate.reset();
-        }
-    }
-
-    /**
-     * Test sending with 1 high priority sender. The high priority sender should
-     * have higher throughput than the other low priority senders.
-     * 
-     * @throws Exception
-     */
-    public void test_2_1_1_HighPriorityProducer() throws Exception {
-
-        producerCount = 2;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-        RemoteProducer producer = sendBroker.producers.get(0);
-        producer.setPriority(1);
-        producer.getRate().setName("High Priority Producer Rate");
-
-        rcvBroker.consumers.get(0).setThinkTime(1);
-
-        // Start 'em up.
-        startServices();
-        try {
-
-            System.out.println("Checking rates for test: " + getName());
-            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-                Period p = new Period();
-                Thread.sleep(1000 * 5);
-                System.out.println(producer.getRate().getRateSummary(p));
-                System.out.println(totalProducerRate.getRateSummary(p));
-                System.out.println(totalConsumerRate.getRateSummary(p));
-                totalProducerRate.reset();
-                totalConsumerRate.reset();
-            }
-
-        } finally {
-            stopServices();
-        }
-    }
-
-    /**
-     * Test sending with 1 high priority sender. The high priority sender should
-     * have higher throughput than the other low priority senders.
-     * 
-     * @throws Exception
-     */
-    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
-        producerCount = 2;
-        destCount = 1;
-        consumerCount = 1;
-
-        createConnections();
-        RemoteProducer producer = sendBroker.producers.get(0);
-        producer.setPriority(1);
-        producer.setPriorityMod(3);
-        producer.getRate().setName("High Priority Producer Rate");
-
-        rcvBroker.consumers.get(0).setThinkTime(1);
-
-        // Start 'em up.
-        startServices();
-        try {
-
-            System.out.println("Checking rates for test: " + getName());
-            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
-                Period p = new Period();
-                Thread.sleep(1000 * 5);
-                System.out.println(producer.getRate().getRateSummary(p));
-                System.out.println(totalProducerRate.getRateSummary(p));
-                System.out.println(totalConsumerRate.getRateSummary(p));
-                totalProducerRate.reset();
-                totalConsumerRate.reset();
-            }
-
-        } finally {
-            stopServices();
-        }
-    }
-
     public void test_1_1_1() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -377,6 +224,96 @@
         }
     }
     
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_HighPriorityProducer() throws Exception {
+
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = sendBroker.producers.get(0);
+        producer.setPriority(1);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        rcvBroker.consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+
+    /**
+     * Test sending with 1 high priority sender. The high priority sender should
+     * have higher throughput than the other low priority senders.
+     * 
+     * @throws Exception
+     */
+    public void test_2_1_1_MixedHighPriorityProducer() throws Exception {
+        producerCount = 2;
+        destCount = 1;
+        consumerCount = 1;
+
+        createConnections();
+        RemoteProducer producer = sendBroker.producers.get(0);
+        producer.setPriority(1);
+        producer.setPriorityMod(3);
+        producer.getRate().setName("High Priority Producer Rate");
+
+        rcvBroker.consumers.get(0).setThinkTime(1);
+
+        // Start 'em up.
+        startServices();
+        try {
+
+            System.out.println("Checking rates for test: " + getName());
+            for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+                Period p = new Period();
+                Thread.sleep(1000 * 5);
+                System.out.println(producer.getRate().getRateSummary(p));
+                System.out.println(totalProducerRate.getRateSummary(p));
+                System.out.println(totalConsumerRate.getRateSummary(p));
+                totalProducerRate.reset();
+                totalConsumerRate.reset();
+            }
+
+        } finally {
+            stopServices();
+        }
+    }
+        
+    private void reportRates() throws InterruptedException {
+        System.out.println("Checking rates for test: " + getName()+", "+(ptp?"ptp":"topic"));
+        for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
+            Period p = new Period();
+            Thread.sleep(1000 * 5);
+            System.out.println(totalProducerRate.getRateSummary(p));
+            System.out.println(totalConsumerRate.getRateSummary(p));
+            totalProducerRate.reset();
+            totalConsumerRate.reset();
+        }
+    }
+
     private void createConnections() throws IOException, URISyntaxException {
 
         if (DISPATCH_MODE == AbstractTestConnection.ASYNC || DISPATCH_MODE == AbstractTestConnection.POLLING) {
@@ -462,9 +399,9 @@
         MockQueue queue = new MockQueue();
         queue.setBroker(broker);
         queue.setDestination(destination);
-        queue.setKeyExtractor(keyExtractor);
+        queue.setKeyExtractor(KEY_MAPPER);
         if( usePartitionedQueue ) {
-            queue.setPartitionMapper(partitionMapper);
+            queue.setPartitionMapper(PARTITION_MAPPER);
         }
         return queue;
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockQueue.java Fri Feb 13 04:34:36 2009
@@ -5,7 +5,7 @@
 
 import java.util.HashMap;
 
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.IQueue;
 import org.apache.activemq.queue.Mapper;
 import org.apache.activemq.queue.PartitionedQueue;
@@ -13,7 +13,7 @@
 import org.apache.activemq.queue.SharedQueue;
 import org.apache.activemq.queue.Subscription;
 
-class MockQueue implements MockBrokerTest.DeliveryTarget {
+class MockQueue implements MockBroker.DeliveryTarget {
 
     HashMap<DeliveryTarget, Subscription<Message>> subs = new HashMap<DeliveryTarget, Subscription<Message>>();
     private Destination destination;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Fri Feb 13 04:34:36 2009
@@ -7,7 +7,7 @@
 
 import org.apache.activemq.dispatch.IDispatcher;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 import org.apache.activemq.queue.ExclusivePriorityQueue;
 import org.apache.activemq.queue.ExclusiveQueue;
 import org.apache.activemq.queue.IFlowQueue;

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java?rev=743980&r1=743979&r2=743980&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/Router.java Fri Feb 13 04:34:36 2009
@@ -7,7 +7,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 
-import org.apache.activemq.flow.MockBrokerTest.DeliveryTarget;
+import org.apache.activemq.flow.MockBroker.DeliveryTarget;
 
 public class Router {
     final HashMap<Destination, Collection<DeliveryTarget>> lookupTable = new HashMap<Destination, Collection<DeliveryTarget>>();