You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/07/08 10:40:45 UTC
[activemq] branch activemq-5.15.x updated: AMQ-7234 - fix up memory
usage wait timeout such that topic pfc in a transaction can see connection
context state changes, fix and test
This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new ede4dbc AMQ-7234 - fix up memory usage wait timeout such that topic pfc in a transaction can see connection context state changes, fix and test
ede4dbc is described below
commit ede4dbc8649792628b1135db9cdf9b06fd6844df
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Jun 21 16:55:13 2019 +0100
AMQ-7234 - fix up memory usage wait timeout such that topic pfc in a transaction can see connection context state changes, fix and test
(cherry picked from commit de3f77063fa579159184d23ac98df97e80d63327)
---
.../org/apache/activemq/broker/region/Queue.java | 2 +-
.../org/apache/activemq/broker/region/Topic.java | 15 ++-
.../org/apache/activemq/usage/MemoryUsage.java | 11 +-
.../org/apache/activemq/usage/MemoryUsageTest.java | 17 ++-
.../usecases/TopicProducerFlowControlTest.java | 118 +++++++++++++++++++++
5 files changed, 153 insertions(+), 10 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2173c08..71bf5bb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -670,7 +670,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// While waiting for space to free up... the
// transaction may be done
if (message.isInTransaction()) {
- if (context.getTransaction().getState() > IN_USE_STATE) {
+ if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
throw new JMSException("Send transaction completed while waiting for space");
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 8b46475..c30916c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -62,6 +62,10 @@ import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.JMSException;
+
+import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
+
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
@@ -403,8 +407,15 @@ public class Topic extends BaseDestination implements Task {
public void run() {
try {
- // While waiting for space to free up... the
- // message may have expired.
+ // While waiting for space to free up...
+ // the transaction may be done
+ if (message.isInTransaction()) {
+ if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
+ throw new JMSException("Send transaction completed while waiting for space");
+ }
+ }
+
+ // the message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();
diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
index 8c38c2d..40d2f1a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
@@ -94,7 +94,7 @@ public class MemoryUsage extends Usage<MemoryUsage> {
* @return true if space
*/
@Override
- public boolean waitForSpace(long timeout) throws InterruptedException {
+ public boolean waitForSpace(final long timeout) throws InterruptedException {
if (parent != null) {
if (!parent.waitForSpace(timeout)) {
return false;
@@ -106,12 +106,15 @@ public class MemoryUsage extends Usage<MemoryUsage> {
usageLock.readLock().unlock();
usageLock.writeLock().lock();
try {
- while (percentUsage >= 100 ) {
- waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
+ final long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+ long timeleft = deadline;
+ while (percentUsage >= 100 && timeleft > 0) {
+ waitForSpaceCondition.await(Math.min(getPollingTime(), timeleft), TimeUnit.MILLISECONDS);
+ timeleft = deadline - System.currentTimeMillis();
}
- usageLock.readLock().lock();
} finally {
usageLock.writeLock().unlock();
+ usageLock.readLock().lock();
}
}
diff --git a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
index b869939..24e47c2 100644
--- a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
+++ b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
@@ -17,9 +17,6 @@
package org.apache.activemq.usage;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -29,6 +26,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
public class MemoryUsageTest {
MemoryUsage underTest;
@@ -83,6 +85,15 @@ public class MemoryUsageTest {
assertEquals("limits are still matched whole", underTest.getLimit(), child.getLimit());
}
+ @Test(timeout=2000)
+ public void testLimitedWaitFail() throws Exception {
+ underTest.setLimit(10);
+ underTest.start();
+ underTest.increaseUsage(11);
+
+ assertFalse("did not get usage within limit", underTest.waitForSpace(500));
+ }
+
@Before
public void setUp() throws Exception {
underTest = new MemoryUsage();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
index 1574ec9..4613af3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.usecases;
+import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,9 +34,11 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
@@ -57,6 +60,8 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
private BrokerService broker;
protected void setUp() throws Exception {
+ produced.set(0);
+ consumed.set(0);
// Setup and start the broker
broker = new BrokerService();
broker.setBrokerName(brokerName);
@@ -202,6 +207,119 @@ public class TopicProducerFlowControlTest extends TestCase implements MessageLis
}
}
+
+ public void testTransactedProducerBlockedAndClosedWillRelease() throws Exception {
+ doTestTransactedProducerBlockedAndClosedWillRelease(false);
+ }
+
+ public void testTransactedSyncSendProducerBlockedAndClosedWillRelease() throws Exception {
+ doTestTransactedProducerBlockedAndClosedWillRelease(true);
+ }
+
+ public void doTestTransactedProducerBlockedAndClosedWillRelease(final boolean alwaysSyncSend) throws Exception {
+
+ // Create the connection factory
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+ connectionFactory.setWatchTopicAdvisories(false);
+ connectionFactory.setAlwaysSyncSend(alwaysSyncSend);
+ Connection c = connectionFactory.createConnection();
+ c.start();
+
+
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setAll(5000);
+ connectionFactory.setPrefetchPolicy(prefetchPolicy);
+ // Start the test destination listener
+ Session listenerSession = c.createSession(false, 1);
+ Destination destination = createDestination(listenerSession);
+
+
+ final AtomicInteger warnings = new AtomicInteger();
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
+ LOG.info("received log message: " + event.getMessage());
+ warnings.incrementAndGet();
+ }
+ }
+ };
+ org.apache.log4j.Logger log4jLogger =
+ org.apache.log4j.Logger.getLogger(Topic.class);
+ log4jLogger.addAppender(appender);
+ try {
+
+ // Start producing the test messages
+ final Session session = connectionFactory.createConnection().createSession(true, Session.SESSION_TRANSACTED);
+ final MessageProducer producer = session.createProducer(destination);
+
+ Thread producingThread = new Thread("Producing Thread") {
+ public void run() {
+ try {
+ for (long i = 0; i < numMessagesToSend; i++) {
+ producer.send(session.createTextMessage("test"));
+
+ long count = produced.incrementAndGet();
+ if (count % 10000 == 0) {
+ LOG.info("Produced " + count + " messages");
+ }
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ producer.close();
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ };
+
+ producingThread.start();
+
+
+ assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return warnings.get() > 0;
+ }
+ }, 5 * 1000));
+
+
+ LOG.info("Produced: " + produced.get() + ", Warnings:" + warnings.get());
+
+ assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return warnings.get() > 0;
+ }
+ }, 5 * 1000));
+
+
+ final long enqueueCountWhenBlocked = broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount();
+
+ // now whack the hung connection broker side (mimic jmx), and verify usage gone b/c of rollback
+ for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
+ transportConnection.serviceException(new IOException("forcing close for hung connection"));
+ }
+
+ assertTrue("Usage gets released on close", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ LOG.info("Usage: " + broker.getSystemUsage().getMemoryUsage().getUsage());
+
+ return broker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+ }
+ }, 5 * 1000));
+
+ c.close();
+
+ // verify no pending sends completed in rolledback tx
+ assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
+
+ } finally {
+ log4jLogger.removeAppender(appender);
+ }
+ }
+
protected Destination createDestination(Session listenerSession) throws Exception {
return new ActiveMQTopic("test");
}