You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/20 21:33:27 UTC

svn commit: r746363 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/

Author: chirino
Date: Fri Feb 20 20:33:26 2009
New Revision: 746363

URL: http://svn.apache.org/viewvc?rev=746363&view=rev
Log:
Applying colin's https://issues.apache.org/activemq/browse/AMQ-2132 patch.

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/AbstractPooledDispatcher.java Fri Feb 20 20:33:26 2009
@@ -1,12 +1,12 @@
 package org.apache.activemq.dispatch;
 
 import java.util.ArrayList;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 public abstract class AbstractPooledDispatcher<D extends IDispatcher> implements IDispatcher, PooledDispatcher<D> {
-    
+
     private final String name;
 
     private final ThreadLocal<D> dispatcher = new ThreadLocal<D>();
@@ -17,7 +17,7 @@
     final AtomicBoolean shutdown = new AtomicBoolean();
 
     private int roundRobinCounter = 0;
-    private final int size;
+    private int size;
 
     protected ExecutionLoadBalancer<D> loadBalancer;
 
@@ -73,8 +73,6 @@
                 interrupted = true;
                 continue;
             }
-            dispatchers.remove(dispatchers.size() - 1);
-
         }
         // Re-interrupt:
         if (interrupted) {
@@ -119,6 +117,11 @@
      * A Dispatcher must call this when exiting it's dispatch loop
      */
     public void onDispatcherStopped(D d) {
+        synchronized (dispatchers) {
+            if (dispatchers.remove(d)) {
+                size--;
+            }
+        }
         loadBalancer.removeDispatcher(d);
     }
 
@@ -126,6 +129,10 @@
         D d = dispatcher.get();
         if (d == null) {
             synchronized (dispatchers) {
+                if(dispatchers.isEmpty())
+                {
+                    throw new RejectedExecutionException();
+                }
                 if (++roundRobinCounter >= size) {
                     roundRobinCounter = 0;
                 }

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 20 20:33:26 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.dispatch;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
 public interface IDispatcher {
@@ -48,8 +49,10 @@
          * dispatch. The {@link Dispatchable} will remain in the dispatch queue
          * until a subsequent call to {@link Dispatchable#dispatch()} returns
          * false;
+         * 
+         * @throws RejectedExecutionException If the dispatcher has been shutdown.
          */
-        public void requestDispatch();
+        public void requestDispatch() throws RejectedExecutionException;
 
         /**
          * This can be called to update the dispatch priority.

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 20 20:33:26 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.dispatch;
 
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -370,7 +371,7 @@
         final UpdateEvent updateEvent[];
 
         private final ExecutionTracker<D> tracker;
-        private D currentOwner;
+        protected D currentOwner;
         private D updateDispatcher = null;
 
         private int priority;
@@ -433,6 +434,9 @@
 
         public void requestDispatch() {
 
+            if (closed) {
+                throw new RejectedExecutionException();
+            }
             D callingDispatcher = getCurrentDispatcher();
             if (tracker != null)
                 tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
@@ -536,17 +540,17 @@
         }
 
         public void close() {
-            tracker.close();
             D callingDispatcher = getCurrentDispatcher();
             synchronized (this) {
                 closed = true;
-
                 // If the owner of this context is the calling thread, then
                 // delegate to the dispatcher.
                 if (currentOwner == callingDispatcher) {
                     if (isLinked()) {
                         unlink();
                     }
+                    tracker.close();
+
                     // FIXME Deadlock potential!
                     synchronized (foreignQueue) {
                         if (updateEvent[foreignToggle].isLinked()) {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Fri Feb 20 20:33:26 2009
@@ -20,6 +20,7 @@
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.activemq.flow.IFlowLimiter.UnThrottleListener;
 
@@ -270,7 +271,7 @@
                 setUnThrottleListener();
             }
         }
-        if( ok ) {
+        if (ok) {
             controllable.flowElemAccepted(this, elem);
         }
         return ok;
@@ -318,7 +319,8 @@
         waitForResume();
 
         if (!blockedSources.contains(source)) {
-//            System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" + source + "]");
+            // System.out.println("BLOCKING  : SINK[" + this + "], SOURCE[" +
+            // source + "]");
             blockedSources.add(source);
             source.onFlowBlock(this);
         }
@@ -399,7 +401,9 @@
                     try {
                         Thread.currentThread().setName(name);
                         for (ISourceController<E> source : blockedSources) {
-//                            System.out.println("UNBLOCKING: SINK[" + FlowController.this + "], SOURCE[" + source + "]");
+                            // System.out.println("UNBLOCKING: SINK[" +
+                            // FlowController.this + "], SOURCE[" + source +
+                            // "]");
                             source.onFlowResume(FlowController.this);
                         }
                         for (FlowUnblockListener<E> listener : unblockListeners) {
@@ -421,7 +425,12 @@
                 }
             };
 
-            RESUME_SERVICE.execute(resume);
+            try {
+                RESUME_SERVICE.execute(resume);
+            } catch (RejectedExecutionException ree) {
+                // Must be shutting down, ignore this, leaving resumeScheduled
+                // true
+            }
         }
     }
 

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=746363&r1=746362&r2=746363&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 20 20:33:26 2009
@@ -55,7 +55,7 @@
     protected boolean tcp = false;
     // set to force marshalling even in the NON tcp case.
     protected boolean forceMarshalling = false;
-    
+
     protected String sendBrokerURI;
     protected String receiveBrokerURI;
 
@@ -91,13 +91,13 @@
 
     @Override
     protected void setUp() throws Exception {
-        dispatcher = PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
-        
-        if( tcp ) {
+        dispatcher = createDispatcher();
+        dispatcher.start();
+        if (tcp) {
             sendBrokerURI = "tcp://localhost:10000?wireFormat=proto";
             receiveBrokerURI = "tcp://localhost:20000?wireFormat=proto";
         } else {
-            if( forceMarshalling ) {
+            if (forceMarshalling) {
                 sendBrokerURI = "pipe://SendBroker?wireFormat=proto";
                 receiveBrokerURI = "pipe://ReceiveBroker?wireFormat=proto";
             } else {
@@ -106,7 +106,11 @@
             }
         }
     }
-    
+
+    protected IDispatcher createDispatcher() {
+        return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
+    }
+
     public void test_1_1_0() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -121,7 +125,7 @@
             stopServices();
         }
     }
-    
+
     public void test_1_1_1() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -264,7 +268,7 @@
             stopServices();
         }
     }
-    
+
     /**
      * Test sending with 1 high priority sender. The high priority sender should
      * have higher throughput than the other low priority senders.
@@ -342,9 +346,9 @@
             stopServices();
         }
     }
-        
+
     private void reportRates() throws InterruptedException {
-        System.out.println("Checking rates for test: " + getName()+", "+(ptp?"ptp":"topic"));
+        System.out.println("Checking rates for test: " + getName() + ", " + (ptp ? "ptp" : "topic"));
         for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
             Period p = new Period();
             Thread.sleep(1000 * 5);
@@ -390,7 +394,7 @@
             RemoteProducer producer = createProducer(i, destination);
             sendBroker.producers.add(producer);
         }
-        
+
         for (int i = 0; i < consumerCount; i++) {
             Destination destination = dests[i % destCount];
             RemoteConsumer consumer = createConsumer(i, destination);
@@ -398,18 +402,18 @@
         }
 
         // Create MultiBroker connections:
-//        if (multibroker) {
-//            Pipe<Message> pipe = new Pipe<Message>();
-//            sendBroker.createBrokerConnection(rcvBroker, pipe);
-//            rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
-//        }
+        // if (multibroker) {
+        // Pipe<Message> pipe = new Pipe<Message>();
+        // sendBroker.createBrokerConnection(rcvBroker, pipe);
+        // rcvBroker.createBrokerConnection(sendBroker, pipe.connect());
+        // }
     }
 
     private RemoteConsumer createConsumer(int i, Destination destination) {
         RemoteConsumer consumer = new RemoteConsumer();
         consumer.setBroker(rcvBroker);
         consumer.setDestination(destination);
-        consumer.setName("consumer"+(i+1));
+        consumer.setName("consumer" + (i + 1));
         consumer.setTotalConsumerRate(totalConsumerRate);
         consumer.setDispatcher(dispatcher);
         return consumer;
@@ -418,8 +422,8 @@
     private RemoteProducer createProducer(int id, Destination destination) {
         RemoteProducer producer = new RemoteProducer();
         producer.setBroker(sendBroker);
-        producer.setProducerId(id+1);
-        producer.setName("producer" +(id+1));
+        producer.setProducerId(id + 1);
+        producer.setName("producer" + (id + 1));
         producer.setDestination(destination);
         producer.setMessageIdGenerator(msgIdGenerator);
         producer.setTotalProducerRate(totalProducerRate);
@@ -432,7 +436,7 @@
         queue.setBroker(broker);
         queue.setDestination(destination);
         queue.setKeyExtractor(KEY_MAPPER);
-        if( usePartitionedQueue ) {
+        if (usePartitionedQueue) {
             queue.setPartitionMapper(PARTITION_MAPPER);
         }
         return queue;