You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/05 11:30:10 UTC

svn commit: r895975 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/usage/ activemq-core/src/test/java/org/apache/activemq/ activemq-core/src/test/java/org/apache/acti...

Author: gtully
Date: Tue Jan  5 10:29:19 2010
New Revision: 895975

URL: http://svn.apache.org/viewvc?rev=895975&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2523 - have Usage use a shared executor with a limited pool of 10 threads and an unbounded queue. Fix potential dropped dispatch attempts when messages waiting for space fail due to memory again being full, can result is hung consumer. In this case, a new notification for not full needs to be registered. Added relevant test case.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=895975&r1=895974&r2=895975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jan  5 10:29:19 2010
@@ -406,8 +406,7 @@
                 }
 
                 // We can avoid blocking due to low usage if the producer is sending
-                // a sync message or
-                // if it is using a producer window
+                // a sync message or if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
                     // copy the exchange state since the context will be modified while we are waiting
                     // for space.
@@ -441,17 +440,14 @@
                                         ExceptionResponse response = new ExceptionResponse(e);
                                         response.setCorrelationId(message.getCommandId());
                                         context.getConnection().dispatchAsync(response);
+                                    } else {
+                                        LOG.debug("unexpected exception on deferred send of :" + message, e);
                                     }
                                 }
                             }
                         });
 
-                        // If the user manager is not full, then the task will not
-                        // get called..
-                        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
-                            // so call it directly here.
-                            sendMessagesWaitingForSpaceTask.run();
-                        }
+                        registerCallbackForNotFullNotification();
                         context.setDontSendReponse(true);
                         return;
                     }
@@ -482,6 +478,15 @@
         }
     }
 
+    private void registerCallbackForNotFullNotification() {
+        // If the usage manager is not full, then the task will not
+        // get called..
+        if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
+            // so call it directly here.
+            sendMessagesWaitingForSpaceTask.run();
+        }
+    }
+
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         synchronized (sendLock) {
@@ -1069,9 +1074,14 @@
 
             // do early to allow dispatch of these waiting messages
             synchronized (messagesWaitingForSpace) {
-                while (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
-                    Runnable op = messagesWaitingForSpace.removeFirst();
-                    op.run();
+                while (!messagesWaitingForSpace.isEmpty()) {
+                    if (!memoryUsage.isFull()) {
+                        Runnable op = messagesWaitingForSpace.removeFirst();
+                        op.run();
+                    } else {
+                        registerCallbackForNotFullNotification();
+                        break;
+                    }
                 }
             }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=895975&r1=895974&r2=895975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Tue Jan  5 10:29:19 2010
@@ -43,6 +43,7 @@
 public abstract class Usage<T extends Usage> implements Service {
 
     private static final Log LOG = LogFactory.getLog(Usage.class);
+    private static ThreadPoolExecutor executor;
     protected final Object usageMutex = new Object();
     protected int percentUsage;
     protected T parent;
@@ -55,7 +56,7 @@
     private List<T> children = new CopyOnWriteArrayList<T>();
     private final List<Runnable> callbacks = new LinkedList<Runnable>();
     private int pollingTime = 100;
-    private volatile ThreadPoolExecutor executor;
+    
     private AtomicBoolean started=new AtomicBoolean();
 
     public Usage(T parent, String name, float portion) {
@@ -247,28 +248,30 @@
             if (oldPercentUsage >= 100 && newPercentUsage < 100) {
                 synchronized (usageMutex) {
                     usageMutex.notifyAll();
-                    for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
-                        Runnable callback = iter.next();
-                        getExecutor().execute(callback);
+                    if (!callbacks.isEmpty()) {
+                        for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
+                            Runnable callback = iter.next();
+                            getExecutor().execute(callback);
+                        }
+                        callbacks.clear();
                     }
-                    callbacks.clear();
                 }
             }
-            // Let the listeners know on a separate thread
-            Runnable listenerNotifier = new Runnable() {
-            
-                public void run() {
-                    for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
-                        UsageListener l = iter.next();
-                        l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+            if (!listeners.isEmpty()) {
+                // Let the listeners know on a separate thread
+                Runnable listenerNotifier = new Runnable() {
+                    public void run() {
+                        for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
+                            UsageListener l = iter.next();
+                            l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
+                        }
                     }
+                };
+                if (started.get()) {
+                    getExecutor().execute(listenerNotifier);
+                } else {
+                    LOG.warn("Not notifying memory usage change to listeners on shutdown");
                 }
-            
-            };
-            if (started.get()) {
-                getExecutor().execute(listenerNotifier);
-            } else {
-                LOG.warn("Not notifying memory usage change to listeners on shutdown");
             }
         }
     }
@@ -299,9 +302,7 @@
             if (parent != null) {
                 parent.removeChild(this);
             }
-            if (this.executor != null){
-                this.executor.shutdownNow();
-            }
+            
             //clear down any callbacks
             synchronized (usageMutex) {
                 usageMutex.notifyAll();
@@ -402,22 +403,17 @@
     }
     
     protected Executor getExecutor() {
-        if (this.executor == null) {
-        	synchronized(usageMutex) {
-        		if (this.executor == null) {
-		            this.executor = new ThreadPoolExecutor(1, 1, 0,
-		                    TimeUnit.NANOSECONDS,
-		                    new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-		                        public Thread newThread(Runnable runnable) {
-		                            Thread thread = new Thread(runnable, getName()
-		                                    + " Usage Thread Pool");
-		                            thread.setDaemon(true);
-		                            return thread;
-		                        }
-		                    });
-        		}
-        	}
-        }
-        return this.executor;
+        return executor;
     }
+    
+    static {
+        executor = new ThreadPoolExecutor(10, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            public Thread newThread(Runnable runnable) {
+                Thread thread = new Thread(runnable, "Usage Async Task");
+                thread.setDaemon(true);
+                return thread;
+            }
+        });
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java?rev=895975&r1=895974&r2=895975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java Tue Jan  5 10:29:19 2010
@@ -63,6 +63,11 @@
     }
     
     @Override
+    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
+        // sendFail means no flowControllwindow as there is no producer ack, just an exception
+    }
+    
+    @Override
     public void testPubisherRecoverAfterBlock() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
         // with sendFail, there must be no flowControllwindow

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?rev=895975&r1=895974&r2=895975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java Tue Jan  5 10:29:19 2010
@@ -37,9 +37,11 @@
 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class ProducerFlowControlTest extends JmsTestSupport {
-
+    static final Log LOG = LogFactory.getLog(ProducerFlowControlTest.class);
     ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
     ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
     protected TransportConnector connector;
@@ -80,8 +82,6 @@
 
     public void testPubisherRecoverAfterBlock() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
-        factory.setProducerWindowSize(1024 * 64);
-        factory.setUseAsyncSend(true);
         connection = (ActiveMQConnection)factory.createConnection();
         connections.add(connection);
         connection.start();
@@ -94,12 +94,14 @@
         
    
 		Thread thread = new Thread("Filler") {
+		    int i;
 			@Override
 			public void run() {
                 while (keepGoing.get()) {
                     done.set(false);
                     try {
-						producer.send(session.createTextMessage("Test message"));
+						producer.send(session.createTextMessage("Test message " + ++i));
+						LOG.info("sent: " + i);
 					} catch (JMSException e) {
 					}
                 }
@@ -114,14 +116,63 @@
         TextMessage msg;
         for (int idx = 0; idx < 5; ++idx) {
         	msg = (TextMessage) consumer.receive(1000);
+        	LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
         	msg.acknowledge();
         }
         Thread.sleep(1000);
         keepGoing.set(false);
     	
-		assertFalse(done.get());
+		assertFalse("producer has resumed", done.get());
     }
-    
+
+    public void testAsyncPubisherRecoverAfterBlock() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        factory.setProducerWindowSize(1024 * 5);
+        factory.setUseAsyncSend(true);
+        connection = (ActiveMQConnection)factory.createConnection();
+        connections.add(connection);
+        connection.start();
+
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(queueA);
+        
+        final AtomicBoolean done = new AtomicBoolean(true);
+        final AtomicBoolean keepGoing = new AtomicBoolean(true);
+        
+   
+        Thread thread = new Thread("Filler") {
+            int i;
+            @Override
+            public void run() {
+                while (keepGoing.get()) {
+                    done.set(false);
+                    try {
+                        producer.send(session.createTextMessage("Test message " + ++i));
+                        LOG.info("sent: " + i);
+                    } catch (JMSException e) {
+                    }
+                }
+            }
+        };
+        thread.start();
+        waitForBlockedOrResourceLimit(done);
+
+        // after receiveing messges, producer should continue sending messages 
+        // (done == false)
+        MessageConsumer consumer = session.createConsumer(queueA);
+        TextMessage msg;
+        for (int idx = 0; idx < 5; ++idx) {
+            msg = (TextMessage) consumer.receive(1000);
+            assertNotNull("Got a message", msg);
+            LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID());
+            msg.acknowledge();
+        }
+        Thread.sleep(1000);
+        keepGoing.set(false);
+        
+        assertFalse("producer has resumed", done.get());
+    }
+
     public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
         factory.setAlwaysSyncSend(true);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java?rev=895975&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java Tue Jan  5 10:29:19 2010
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MemoryUsageTest {
+
+    MemoryUsage underTest;
+      
+    @Test
+    public final void testPercentUsageNeedsNoThread() {    
+        int activeThreadCount = Thread.activeCount();
+        underTest.setLimit(10);
+        underTest.start();
+        underTest.increaseUsage(1);
+        assertEquals("usage is correct", 10, underTest.getPercentUsage());
+        assertEquals("no new thread created withough listener or callback",activeThreadCount, Thread.activeCount()); 
+    }
+    
+    @Test
+    public final void testAddUsageListenerStartsThread() throws Exception {       
+        int activeThreadCount = Thread.activeCount();
+        underTest = new MemoryUsage();
+        underTest.setLimit(10);
+        underTest.start();
+        final CountDownLatch called = new CountDownLatch(1);
+        final String[] listnerThreadNameHolder = new String[1];
+        underTest.addUsageListener(new UsageListener() {
+            public void onUsageChanged(Usage usage, int oldPercentUsage,
+                    int newPercentUsage) {
+                called.countDown();
+                listnerThreadNameHolder[0] = Thread.currentThread().toString();
+            }
+        });
+        underTest.increaseUsage(1);
+        assertTrue("listner was called", called.await(30, TimeUnit.SECONDS));
+        assertTrue("listner called from another thread", !Thread.currentThread().toString().equals(listnerThreadNameHolder[0]));
+        assertEquals("usage is correct", 10, underTest.getPercentUsage());
+        assertEquals("new thread created with listener", activeThreadCount + 1, Thread.activeCount());        
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        underTest = new MemoryUsage();   
+    }
+    
+    @After
+    public void tearDown() {
+        assertNotNull(underTest);
+        underTest.stop();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java?rev=895975&r1=895974&r2=895975&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java (original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/https/HttpsTransportBrokerTest.java Tue Jan  5 10:29:19 2010
@@ -36,7 +36,8 @@
         //System.setProperty("javax.net.debug", "ssl,handshake,data,trustmanager");
         super.setUp();
 
-        Thread.sleep(1000);
+        Thread.sleep(2000);
+        Thread.yield();
     }
 
     protected void tearDown() throws Exception {