You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2019/06/21 15:55:31 UTC

[activemq] branch master updated: AMQ-7234 - fix up memory usage wait timeout such that topic pfc in a transaction can see connection context state changes, fix and test

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new de3f770  AMQ-7234 - fix up memory usage wait timeout such that topic pfc in a transaction can see connection context state changes, fix and test
de3f770 is described below

commit de3f77063fa579159184d23ac98df97e80d63327
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Jun 21 16:55:13 2019 +0100

    AMQ-7234 - fix up memory usage wait timeout such that topic pfc in a transaction can see connection context state changes, fix and test
---
 .../org/apache/activemq/broker/region/Queue.java   |   2 +-
 .../org/apache/activemq/broker/region/Topic.java   |  15 ++-
 .../org/apache/activemq/usage/MemoryUsage.java     |  11 +-
 .../org/apache/activemq/usage/MemoryUsageTest.java |  17 ++-
 .../usecases/TopicProducerFlowControlTest.java     | 118 +++++++++++++++++++++
 5 files changed, 153 insertions(+), 10 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e8ef717..fc4442c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -694,7 +694,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                                     // While waiting for space to free up... the
                                     // transaction may be done
                                     if (message.isInTransaction()) {
-                                        if (context.getTransaction().getState() > IN_USE_STATE) {
+                                        if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
                                             throw new JMSException("Send transaction completed while waiting for space");
                                         }
                                     }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index a0f2d06..66e586c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -63,6 +63,10 @@ import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+
+import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
+
 /**
  * The Topic is a destination that sends a copy of a message to every active
  * Subscription registered.
@@ -409,8 +413,15 @@ public class Topic extends BaseDestination implements Task {
                             public void run() {
                                 try {
 
-                                    // While waiting for space to free up... the
-                                    // message may have expired.
+                                    // While waiting for space to free up...
+                                    // the transaction may be done
+                                    if (message.isInTransaction()) {
+                                        if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
+                                            throw new JMSException("Send transaction completed while waiting for space");
+                                        }
+                                    }
+
+                                    // the message may have expired.
                                     if (message.isExpired()) {
                                         broker.messageExpired(context, message, null);
                                         getDestinationStatistics().getExpired().increment();
diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
index 8c38c2d..40d2f1a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
@@ -94,7 +94,7 @@ public class MemoryUsage extends Usage<MemoryUsage> {
      * @return true if space
      */
     @Override
-    public boolean waitForSpace(long timeout) throws InterruptedException {
+    public boolean waitForSpace(final long timeout) throws InterruptedException {
         if (parent != null) {
             if (!parent.waitForSpace(timeout)) {
                 return false;
@@ -106,12 +106,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
                 usageLock.readLock().unlock();
                 usageLock.writeLock().lock();
                 try {
-                    while (percentUsage >= 100 ) {
-                        waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
+                    final long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+                    long timeleft = deadline;
+                    while (percentUsage >= 100 && timeleft > 0) {
+                        waitForSpaceCondition.await(Math.min(getPollingTime(), timeleft), TimeUnit.MILLISECONDS);
+                        timeleft = deadline - System.currentTimeMillis();
                     }
-                    usageLock.readLock().lock();
                 } finally {
                     usageLock.writeLock().unlock();
+                    usageLock.readLock().lock();
                 }
             }
 
diff --git a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
index b869939..24e47c2 100644
--- a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
+++ b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.activemq.usage;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -29,6 +26,11 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 public class MemoryUsageTest {
 
     MemoryUsage underTest;
@@ -83,6 +85,15 @@ public class MemoryUsageTest {
         assertEquals("limits are still matched whole", underTest.getLimit(), child.getLimit());
     }
 
+    @Test(timeout=2000)
+    public void testLimitedWaitFail() throws Exception {
+        underTest.setLimit(10);
+        underTest.start();
+        underTest.increaseUsage(11);
+
+        assertFalse("did not get usage within limit", underTest.waitForSpace(500));
+    }
+
     @Before
     public void setUp() throws Exception {
         underTest = new MemoryUsage();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
index 76f54ff..9beb4d6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.usecases;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -33,9 +34,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.DefaultTestAppender;
 import org.apache.activemq.util.Wait;
@@ -57,6 +60,8 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
     private BrokerService broker;
 
     protected void setUp() throws Exception {
+        produced.set(0);
+        consumed.set(0);
         // Setup and start the broker
         broker = new BrokerService();
         broker.setBrokerName(brokerName);
@@ -202,6 +207,119 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
         }
     }
 
+
+    public void testTransactedProducerBlockedAndClosedWillRelease() throws Exception {
+        doTestTransactedProducerBlockedAndClosedWillRelease(false);
+    }
+
+    public void testTransactedSyncSendProducerBlockedAndClosedWillRelease() throws Exception {
+        doTestTransactedProducerBlockedAndClosedWillRelease(true);
+    }
+
+    public void doTestTransactedProducerBlockedAndClosedWillRelease(final boolean alwaysSyncSend) throws Exception {
+
+        // Create the connection factory
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+        connectionFactory.setWatchTopicAdvisories(false);
+        connectionFactory.setAlwaysSyncSend(alwaysSyncSend);
+        Connection c = connectionFactory.createConnection();
+        c.start();
+
+
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setAll(5000);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+        // Start the test destination listener
+        Session listenerSession = c.createSession(false, 1);
+        Destination destination = createDestination(listenerSession);
+
+
+        final AtomicInteger warnings = new AtomicInteger();
+        Appender appender = new DefaultTestAppender() {
+            @Override
+            public void doAppend(LoggingEvent event) {
+                if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
+                    LOG.info("received  log message: " + event.getMessage());
+                    warnings.incrementAndGet();
+                }
+            }
+        };
+        org.apache.log4j.Logger log4jLogger =
+                org.apache.log4j.Logger.getLogger(Topic.class);
+        log4jLogger.addAppender(appender);
+        try {
+
+            // Start producing the test messages
+            final Session session = connectionFactory.createConnection().createSession(true, Session.SESSION_TRANSACTED);
+            final MessageProducer producer = session.createProducer(destination);
+
+            Thread producingThread = new Thread("Producing Thread") {
+                public void run() {
+                    try {
+                        for (long i = 0; i < numMessagesToSend; i++) {
+                            producer.send(session.createTextMessage("test"));
+
+                            long count = produced.incrementAndGet();
+                            if (count % 10000 == 0) {
+                                LOG.info("Produced " + count + " messages");
+                            }
+                        }
+                    } catch (Throwable ex) {
+                        ex.printStackTrace();
+                    } finally {
+                        try {
+                            producer.close();
+                            session.close();
+                        } catch (Exception e) {
+                        }
+                    }
+                }
+            };
+
+            producingThread.start();
+
+
+            assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return warnings.get() > 0;
+                }
+            }, 5 * 1000));
+
+
+            LOG.info("Produced: " + produced.get() + ", Warnings:" + warnings.get());
+
+            assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    return warnings.get() > 0;
+                }
+            }, 5 * 1000));
+
+
+            final long enqueueCountWhenBlocked = broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount();
+
+            // now whack the hung connection broker side (mimic jmx), and verify usage gone b/c of rollback
+            for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
+                transportConnection.serviceException(new IOException("forcing close for hung connection"));
+            }
+
+            assertTrue("Usage gets released on close", Wait.waitFor(new Wait.Condition() {
+                public boolean isSatisified() throws Exception {
+                    LOG.info("Usage: " + broker.getSystemUsage().getMemoryUsage().getUsage());
+
+                    return broker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+                }
+            }, 5 * 1000));
+
+            c.close();
+
+            // verify no pending sends completed in rolledback tx
+            assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
+
+        } finally {
+            log4jLogger.removeAppender(appender);
+        }
+    }
+
     protected Destination createDestination(Session listenerSession) throws Exception {
         return new ActiveMQTopic("test");
     }