You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/10/25 14:19:48 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6477
Repository: activemq
Updated Branches:
refs/heads/master 114706a7b -> 7c3bb4010
https://issues.apache.org/jira/browse/AMQ-6477
ReduceMemoryFootprint now applies to non-persistent messages if they
have been marshalled and topics now clear memory after the recovery
policy check
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c3bb401
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c3bb401
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c3bb401
Branch: refs/heads/master
Commit: 7c3bb401007b4047c540287b53b435b20d3161c0
Parents: 114706a
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Oct 25 10:17:04 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Oct 25 10:19:03 2016 -0400
----------------------------------------------------------------------
.../apache/activemq/broker/region/Queue.java | 10 +-
.../apache/activemq/broker/region/Topic.java | 11 +-
.../activemq/command/ActiveMQMapMessage.java | 4 +-
.../activemq/command/ActiveMQObjectMessage.java | 4 +-
.../activemq/command/ActiveMQTextMessage.java | 4 +-
.../org/apache/activemq/command/Message.java | 15 +-
.../org/apache/activemq/bugs/AMQ2103Test.java | 15 +-
.../apache/activemq/usecases/AMQ6477Test.java | 188 +++++++++++++++++++
8 files changed, 236 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index a74fe3b..6a42ebc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -839,9 +839,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
} else {
store.addMessage(context, message);
}
- if (isReduceMemoryFootprint()) {
- message.clearMarshalledState();
- }
} catch (Exception e) {
// we may have a store in inconsistent state, so reset the cursor
// before restarting normal broker operations
@@ -849,6 +846,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
throw e;
}
}
+
+ //Clear the unmarshalled state if the message is marshalled
+ //Persistent messages will always be marshalled but non-persistent may not be
+ //Specially non-persistent messages over the VM transport won't be
+ if (isReduceMemoryFootprint() && message.isMarshalled()) {
+ message.clearUnMarshalledState();
+ }
if(tryOrderedCursorAdd(message, context)) {
break;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 2aa6e18..0842467 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -516,9 +516,7 @@ public class Topic extends BaseDestination implements Task {
}
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
- if (isReduceMemoryFootprint()) {
- message.clearMarshalledState();
- }
+ //Moved the reduceMemoryfootprint clearing to the dispatch method
}
message.incrementReferenceCount();
@@ -758,6 +756,13 @@ public class Topic extends BaseDestination implements Task {
return;
}
}
+
+ // Clear memory before dispatch - need to clear here because the call to
+ //subscriptionRecoveryPolicy.add() will unmarshall the state
+ if (isReduceMemoryFootprint() && message.isMarshalled()) {
+ message.clearUnMarshalledState();
+ }
+
msgContext = context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index 7d80681..e1db3f7 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -130,8 +130,8 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
}
@Override
- public void clearMarshalledState() throws JMSException {
- super.clearMarshalledState();
+ public void clearUnMarshalledState() throws JMSException {
+ super.clearUnMarshalledState();
map.clear();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
index 468ce79..8c5611f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
@@ -224,8 +224,8 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
}
@Override
- public void clearMarshalledState() throws JMSException {
- super.clearMarshalledState();
+ public void clearUnMarshalledState() throws JMSException {
+ super.clearUnMarshalledState();
this.object = null;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 97fc9e4..bb89378 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -153,8 +153,8 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
// see https://issues.apache.org/activemq/browse/AMQ-2103
// and https://issues.apache.org/activemq/browse/AMQ-2966
@Override
- public void clearMarshalledState() throws JMSException {
- super.clearMarshalledState();
+ public void clearUnMarshalledState() throws JMSException {
+ super.clearUnMarshalledState();
this.text = null;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/Message.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index 83f3201..13e0030 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -110,11 +110,24 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
public abstract void storeContent();
public abstract void storeContentAndClear();
- // useful to reduce the memory footprint of a persisted message
+ /**
+ * @deprecated - This method name is misnamed
+ * @throws JMSException
+ */
public void clearMarshalledState() throws JMSException {
+ clearUnMarshalledState();
+ }
+
+ // useful to reduce the memory footprint of a persisted message
+ public void clearUnMarshalledState() throws JMSException {
properties = null;
}
+ public boolean isMarshalled() {
+ return content != null && (marshalledProperties != null ||
+ (marshalledProperties == null && properties == null));
+ }
+
protected void copy(Message copy) {
super.copy(copy);
copy.producerId = producerId;
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
index 8a952fd..ae84fca 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
@@ -21,7 +21,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import junit.framework.Test;
+
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerTestSupport;
@@ -32,7 +32,16 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.usecases.MyObject;
+import org.junit.Ignore;
+
+import junit.framework.Test;
+
+/**
+ * AMQ-6477 changes the behavior to only clear memory if the marshalled state exists
+ * so this test no longer works
+ */
+@Ignore
public class AMQ2103Test extends BrokerTestSupport {
static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
static {
@@ -47,7 +56,7 @@ public class AMQ2103Test extends BrokerTestSupport {
}
public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception {
- addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});
+ addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});
}
public static Test suite() {
@@ -60,6 +69,8 @@ public class AMQ2103Test extends BrokerTestSupport {
* With vm transport and deferred serialisation and no persistence (mem persistence),
* we see the message as sent by the client so we can validate the contents against
* the policy
+ *
+ *
* @throws Exception
*/
public void testVerifyMarshalledStateIsCleared() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java
new file mode 100644
index 0000000..02d9425
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that clearUnMarshalled data gets called properly to reduce memory usage
+ */
+@RunWith(Parameterized.class)
+public class AMQ6477Test {
+
+ static final Logger LOG = LoggerFactory.getLogger(AMQ6477Test.class);
+
+ @Rule
+ public TemporaryFolder dataDir = new TemporaryFolder();
+
+ private BrokerService brokerService;
+ private String connectionUri;
+ private final ActiveMQQueue queue = new ActiveMQQueue("queue");
+ private final ActiveMQTopic topic = new ActiveMQTopic("topic");
+ private final int numMessages = 10;
+ private Connection connection;
+ private Session session;
+ private SubType subType;
+ private boolean persistent;
+
+ protected enum SubType {QUEUE, TOPIC, DURABLE};
+
+ @Parameters(name="subType={0},isPersistent={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {SubType.QUEUE, false},
+ {SubType.TOPIC, false},
+ {SubType.DURABLE, false},
+ {SubType.QUEUE, true},
+ {SubType.TOPIC, true},
+ {SubType.DURABLE, true}
+ });
+ }
+
+ /**
+ */
+ public AMQ6477Test(SubType subType, boolean persistent) {
+ super();
+ this.subType = subType;
+ this.persistent = persistent;
+ }
+
+ @Before
+ public void before() throws Exception {
+ brokerService = new BrokerService();
+ TransportConnector connector = brokerService.addConnector("tcp://localhost:0");
+ connectionUri = connector.getPublishableConnectString();
+ brokerService.setPersistent(persistent);
+ brokerService.setDataDirectory(dataDir.getRoot().getAbsolutePath());
+ brokerService.setUseJmx(false);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry entry = new PolicyEntry();
+ entry.setReduceMemoryFootprint(true);
+
+ policyMap.setDefaultEntry(entry);
+ brokerService.setDestinationPolicy(policyMap);
+
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+ connection = factory.createConnection();
+ connection.setClientID("clientId");
+ connection.start();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @After
+ public void after() throws Exception {
+ if (connection != null) {
+ connection.stop();
+ }
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testReduceMemoryFootprint() throws Exception {
+
+ ActiveMQDestination destination = subType.equals(SubType.QUEUE) ? queue : topic;
+
+ MessageConsumer consumer = subType.equals(SubType.DURABLE) ?
+ session.createDurableSubscriber(topic, "sub1") : session.createConsumer(destination);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage m = session.createTextMessage("test");
+ m.setStringProperty("test", "test");
+ producer.send(m);
+ }
+
+ Subscription sub = brokerService.getDestination(destination).getConsumers().get(0);
+ List<MessageReference> messages = getSubscriptionMessages(sub);
+
+ //Go through each message and make sure the unmarshalled fields are null
+ //then call the getters which will unmarshall the data again to show the marshalled
+ //data exists
+ for (MessageReference ref : messages) {
+ ActiveMQTextMessage message = (ActiveMQTextMessage) ref.getMessage();
+ Field propertiesField = Message.class.getDeclaredField("properties");
+ propertiesField.setAccessible(true);
+ Field textField = ActiveMQTextMessage.class.getDeclaredField("text");
+ textField.setAccessible(true);
+
+ assertNull(textField.get(message));
+ assertNull(propertiesField.get(message));
+ assertNotNull(message.getProperties());
+ assertNotNull(message.getText());
+ }
+ consumer.close();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ protected List<MessageReference> getSubscriptionMessages(Subscription sub) throws Exception {
+ Field f = null;
+
+ if (sub instanceof TopicSubscription) {
+ f = TopicSubscription.class.getDeclaredField("dispatched");
+ } else {
+ f = PrefetchSubscription.class.getDeclaredField("dispatched");
+ }
+ f.setAccessible(true);
+ return (List<MessageReference>) f.get(sub);
+ }
+
+}
\ No newline at end of file