You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/27 20:53:09 UTC

svn commit: r748660 - in /activemq/sandbox/activemq-flow/src: main/java/org/apache/activemq/dispatch/ main/java/org/apache/activemq/flow/ test/java/org/apache/activemq/flow/

Author: chirino
Date: Fri Feb 27 19:53:09 2009
New Revision: 748660

URL: http://svn.apache.org/viewvc?rev=748660&view=rev
Log:
Applying colins's https://issues.apache.org/activemq/browse/AMQ-2141 patch

Modified:
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
    activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
    activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/IDispatcher.java Fri Feb 27 19:53:09 2009
@@ -77,9 +77,10 @@
 
         /**
          * This must be called to release any resource the dispatcher is holding
-         * on behalf of this context.
+         * on behalf of this context. Once called this {@link DispatchContext} should
+         * no longer be used. 
          */
-        public void close();
+        public void close(boolean sync);
     }
 
     public class RunnableAdapter implements Dispatchable, Runnable {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/dispatch/PriorityDispatcher.java Fri Feb 27 19:53:09 2009
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.dispatch;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
@@ -35,12 +38,13 @@
     protected boolean running = false;
     private boolean threaded = false;
     protected final int MAX_USER_PRIORITY;
+    protected final HashSet<PriorityDispatchContext> contexts = new HashSet<PriorityDispatchContext>();
 
     // Set if this dispatcher is part of a dispatch pool:
     protected final PooledDispatcher<D> pooledDispatcher;
 
     // The local dispatch queue:
-    private final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
+    protected final PriorityLinkedList<PriorityDispatchContext> priorityQueue;
 
     // Dispatch queue for requests from other threads:
     private final LinkedNodeList<ForeignEvent>[] foreignQueue;
@@ -157,16 +161,35 @@
      * 
      * @see org.apache.activemq.dispatch.IDispatcher#shutdown()
      */
-    public synchronized void shutdown() throws InterruptedException {
-        if (thread != null) {
-            dispatch(new RunnableAdapter() {
-                public void run() {
-                    running = false;
-                }
-            }, MAX_USER_PRIORITY + 1);
+    public void shutdown() throws InterruptedException {
+        Thread joinThread = null;
+        synchronized (this) {
+            if (thread != null) {
+                dispatch(new RunnableAdapter() {
+                    public void run() {
+                        running = false;
+                    }
+                }, MAX_USER_PRIORITY + 1);
+                joinThread = thread;
+                thread = null;
+            }
+        }
+        if (joinThread != null) {
             // thread.interrupt();
-            thread.join();
-            thread = null;
+            joinThread.join();
+        }
+    }
+
+    protected void cleanup() {
+        ArrayList<PriorityDispatchContext> toClose = null;
+        synchronized (this) {
+            running = false;
+            toClose = new ArrayList<PriorityDispatchContext>(contexts.size());
+            toClose.addAll(contexts);
+        }
+
+        for (PriorityDispatchContext context : toClose) {
+            context.close(false);
         }
     }
 
@@ -235,6 +258,7 @@
             thrown.printStackTrace();
         } finally {
             pooledDispatcher.onDispatcherStopped((D) this);
+            cleanup();
         }
     }
 
@@ -261,7 +285,7 @@
         foreignPermits.release();
     }
 
-    protected final void onForeignUdate(PriorityDispatchContext context) {
+    protected final void onForeignUpdate(PriorityDispatchContext context) {
         synchronized (foreignQueue) {
 
             ForeignEvent fe = context.updateEvent[foreignToggle];
@@ -283,14 +307,31 @@
             if (context.updateEvent[1].isLinked()) {
                 context.updateEvent[1].unlink();
             }
-            if (context.isLinked()) {
-                context.unlink();
-                return true;
-            }
         }
+
+        if (context.isLinked()) {
+            context.unlink();
+            return true;
+        }
+
+        synchronized (this) {
+            contexts.remove(context);
+        }
+
         return false;
     }
 
+    protected final boolean takeOwnership(PriorityDispatchContext context) {
+        synchronized (this) {
+            if (running) {
+                contexts.add(context);
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
     /*
      * (non-Javadoc)
      * 
@@ -377,11 +418,13 @@
         private int priority;
         private boolean dispatchRequested = false;
         private boolean closed = false;
+        final CountDownLatch closeLatch = new CountDownLatch(1);
 
         protected PriorityDispatchContext(Dispatchable dispatchable, boolean persistent, String name) {
             this.dispatchable = dispatchable;
             this.name = name;
             this.currentOwner = (D) PriorityDispatcher.this;
+            this.currentOwner.contexts.add(this);
             if (persistent) {
                 this.tracker = pooledDispatcher.getLoadBalancer().createExecutionTracker((PooledDispatchContext<D>) this);
             } else {
@@ -428,15 +471,14 @@
                 updateDispatcher = newDispatcher;
                 if (DEBUG)
                     System.out.println(getName() + " updating to " + updateDispatcher);
+
+                currentOwner.onForeignUpdate(this);
             }
-            currentOwner.onForeignUdate(this);
+
         }
 
         public void requestDispatch() {
 
-            if (closed) {
-                throw new RejectedExecutionException();
-            }
             D callingDispatcher = getCurrentDispatcher();
             if (tracker != null)
                 tracker.onDispatchRequest(callingDispatcher, getCurrentDispatchContext());
@@ -449,6 +491,12 @@
                 // delegate to the dispatcher.
                 if (currentOwner == callingDispatcher) {
 
+                    if (!currentOwner.running) {
+                        // TODO In the event that the current dispatcher
+                        // failed due to a runtime exception, we could
+                        // try to switch to a new dispatcher.
+                        throw new RejectedExecutionException();
+                    }
                     if (!isLinked()) {
                         currentOwner.priorityQueue.add(this, listPrio);
                     }
@@ -456,12 +504,16 @@
                 }
 
                 dispatchRequested = true;
+                currentOwner.onForeignUpdate(this);
             }
-            // FIXME Thread safety!
-            currentOwner.onForeignUdate(this);
         }
 
         public void updatePriority(int priority) {
+
+            if (closed) {
+                return;
+            }
+
             if (this.priority == priority) {
                 return;
             }
@@ -470,6 +522,9 @@
             // Otherwise this is coming off another thread, so we need to
             // synchronize to protect against ownership changes:
             synchronized (this) {
+                if (closed) {
+                    return;
+                }
                 this.priority = priority;
 
                 // If this is called by the owning dispatcher, then we go ahead
@@ -488,30 +543,34 @@
                     }
                     return;
                 }
+
+                currentOwner.onForeignUpdate(this);
             }
-            // FIXME Thread safety!
-            currentOwner.onForeignUdate(this);
+
         }
 
         public void processForeignUpdates() {
-            boolean ownerChange = false;
             synchronized (this) {
 
                 if (closed) {
-                    close();
+                    close(false);
                     return;
                 }
 
-                if (updateDispatcher != null) {
+                if (updateDispatcher != null && updateDispatcher.takeOwnership(this)) {
                     if (DEBUG) {
                         System.out.println("Assigning " + getName() + " to " + updateDispatcher);
                     }
+
                     if (currentOwner.removeDispatchContext(this)) {
                         dispatchRequested = true;
                     }
+
+                    updateDispatcher.onForeignUpdate(this);
+                    switchedDispatcher(currentOwner, updateDispatcher);
                     currentOwner = updateDispatcher;
                     updateDispatcher = null;
-                    ownerChange = true;
+
                 } else {
                     updatePriority(priority);
 
@@ -521,10 +580,6 @@
                     }
                 }
             }
-
-            if (ownerChange) {
-                currentOwner.onForeignUdate(this);
-            }
         }
 
         /**
@@ -539,27 +594,40 @@
 
         }
 
-        public void close() {
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public void close(boolean sync) {
             D callingDispatcher = getCurrentDispatcher();
+            // System.out.println(this + "Closing");
             synchronized (this) {
                 closed = true;
                 // If the owner of this context is the calling thread, then
                 // delegate to the dispatcher.
                 if (currentOwner == callingDispatcher) {
-                    if (isLinked()) {
-                        unlink();
-                    }
-                    tracker.close();
+                    removeDispatchContext(this);
+                    closeLatch.countDown();
+                    return;
+                }
+            }
 
-                    // FIXME Deadlock potential!
-                    synchronized (foreignQueue) {
-                        if (updateEvent[foreignToggle].isLinked()) {
-                            updateEvent[foreignToggle].unlink();
-                        }
+            currentOwner.onForeignUpdate(this);
+            if (sync) {
+                boolean interrupted = false;
+                while (true) {
+                    try {
+                        closeLatch.await();
+                        break;
+                    } catch (InterruptedException e) {
+                        interrupted = true;
                     }
                 }
+
+                if (interrupted) {
+                    Thread.currentThread().interrupt();
+                }
             }
-            currentOwner.onForeignUdate(this);
         }
 
         public final String toString() {

Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java Fri Feb 27 19:53:09 2009
@@ -399,7 +399,6 @@
                     }
                     String was = Thread.currentThread().getName();
                     try {
-                        Thread.currentThread().setName(name);
                         for (ISourceController<E> source : blockedSources) {
                             // System.out.println("UNBLOCKING: SINK[" +
                             // FlowController.this + "], SOURCE[" + source +
@@ -420,7 +419,6 @@
                             resumeScheduled = false;
                             mutex.notifyAll();
                         }
-                        Thread.currentThread().setName(was);
                     }
                 }
             };

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/MockBrokerTest.java Fri Feb 27 19:53:09 2009
@@ -32,7 +32,6 @@
 import org.apache.activemq.metric.Period;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.queue.Mapper;
-import org.apache.activemq.transport.nio.SelectorManager;
 
 public class MockBrokerTest extends TestCase {
 
@@ -110,7 +109,7 @@
     protected IDispatcher createDispatcher() {
         return PriorityDispatcher.createPriorityDispatchPool("BrokerDispatcher", Message.MAX_PRIORITY, asyncThreadPoolSize);
     }
-
+    
     public void test_1_1_0() throws Exception {
         producerCount = 1;
         destCount = 1;
@@ -235,7 +234,7 @@
         consumerCount = 2;
 
         createConnections();
-        rcvBroker.consumers.get(0).setThinkTime(5);
+        rcvBroker.consumers.get(0).setThinkTime(50);
 
         // Start 'em up.
         startServices();
@@ -451,19 +450,19 @@
     }
 
     private void stopServices() throws Exception {
-        if (dispatcher != null) {
-            dispatcher.shutdown();
-        }
         for (MockBroker broker : brokers) {
             broker.stopServices();
         }
+        if (dispatcher != null) {
+            dispatcher.shutdown();
+        }
     }
 
     private void startServices() throws Exception {
         for (MockBroker broker : brokers) {
             broker.startServices();
         }
-        SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
+        //SelectorManager.SINGLETON.setChannelExecutor(dispatcher.createPriorityExecutor(PRIORITY_LEVELS));
     }
 
 }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/PipeTransportFactory.java Fri Feb 27 19:53:09 2009
@@ -63,7 +63,7 @@
 
         public void stop() throws Exception {
             if (readContext != null) {
-                readContext.close();
+                readContext.close(true);
             } else {
                 stopping.set(true);
                 thread.join();

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteConnection.java Fri Feb 27 19:53:09 2009
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.dispatch.IDispatcher;
@@ -39,7 +40,7 @@
     private final int outputResumeThreshold = 900;
 
     private final int inputWindowSize = 1000;
-    private final int inputResumeThreshold = 900;
+    private final int inputResumeThreshold = 500;
 
     private IDispatcher dispatcher;
     private final AtomicBoolean stopping = new AtomicBoolean();
@@ -181,17 +182,21 @@
                     onException(e);
                 }
             } else {
-                blockingWriter.execute(new Runnable() {
-                    public void run() {
-                        if (!stopping.get()) {
-                            try {
-                                transport.oneway(o);
-                            } catch (IOException e) {
-                                onException(e);
+                try {
+                    blockingWriter.execute(new Runnable() {
+                        public void run() {
+                            if (!stopping.get()) {
+                                try {
+                                    transport.oneway(o);
+                                } catch (IOException e) {
+                                    onException(e);
+                                }
                             }
                         }
-                    }
-                });
+                    });
+                } catch (RejectedExecutionException re) {
+                    //Must be shutting down.
+                }
             }
         }
     }

Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java?rev=748660&r1=748659&r2=748660&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/flow/RemoteProducer.java Fri Feb 27 19:53:09 2009
@@ -65,7 +65,7 @@
     
     public void stop() throws Exception
     {
-    	dispatchContext.close();
+    	dispatchContext.close(false);
     	super.stop();
     }