You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/10/25 14:19:48 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6477

Repository: activemq
Updated Branches:
  refs/heads/master 114706a7b -> 7c3bb4010


https://issues.apache.org/jira/browse/AMQ-6477

ReduceMemoryFootprint now applies to non-persistent messages if they
have been marshalled and topics now clear memory after the recovery
policy check


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c3bb401
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c3bb401
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c3bb401

Branch: refs/heads/master
Commit: 7c3bb401007b4047c540287b53b435b20d3161c0
Parents: 114706a
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Oct 25 10:17:04 2016 -0400
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Oct 25 10:19:03 2016 -0400

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  10 +-
 .../apache/activemq/broker/region/Topic.java    |  11 +-
 .../activemq/command/ActiveMQMapMessage.java    |   4 +-
 .../activemq/command/ActiveMQObjectMessage.java |   4 +-
 .../activemq/command/ActiveMQTextMessage.java   |   4 +-
 .../org/apache/activemq/command/Message.java    |  15 +-
 .../org/apache/activemq/bugs/AMQ2103Test.java   |  15 +-
 .../apache/activemq/usecases/AMQ6477Test.java   | 188 +++++++++++++++++++
 8 files changed, 236 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index a74fe3b..6a42ebc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -839,9 +839,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     } else {
                         store.addMessage(context, message);
                     }
-                    if (isReduceMemoryFootprint()) {
-                        message.clearMarshalledState();
-                    }
                 } catch (Exception e) {
                     // we may have a store in inconsistent state, so reset the cursor
                     // before restarting normal broker operations
@@ -849,6 +846,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     throw e;
                 }
             }
+
+            //Clear the unmarshalled state if the message is marshalled
+            //Persistent messages will always be marshalled but non-persistent may not be
+            //Specially non-persistent messages over the VM transport won't be
+            if (isReduceMemoryFootprint() && message.isMarshalled()) {
+                message.clearUnMarshalledState();
+            }
             if(tryOrderedCursorAdd(message, context)) {
                 break;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 2aa6e18..0842467 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -516,9 +516,7 @@ public class Topic extends BaseDestination implements Task {
             }
             result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
 
-            if (isReduceMemoryFootprint()) {
-                message.clearMarshalledState();
-            }
+            //Moved the reduceMemoryfootprint clearing to the dispatch method
         }
 
         message.incrementReferenceCount();
@@ -758,6 +756,13 @@ public class Topic extends BaseDestination implements Task {
                     return;
                 }
             }
+
+            // Clear memory before dispatch - need to clear here because the call to
+            //subscriptionRecoveryPolicy.add() will unmarshall the state
+            if (isReduceMemoryFootprint() && message.isMarshalled()) {
+                message.clearUnMarshalledState();
+            }
+
             msgContext = context.getMessageEvaluationContext();
             msgContext.setDestination(destination);
             msgContext.setMessageReference(message);

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
index 7d80681..e1db3f7 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQMapMessage.java
@@ -130,8 +130,8 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
     }
 
     @Override
-    public void clearMarshalledState() throws JMSException {
-        super.clearMarshalledState();
+    public void clearUnMarshalledState() throws JMSException {
+        super.clearUnMarshalledState();
         map.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
index 468ce79..8c5611f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java
@@ -224,8 +224,8 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
     }
 
     @Override
-    public void clearMarshalledState() throws JMSException {
-        super.clearMarshalledState();
+    public void clearUnMarshalledState() throws JMSException {
+        super.clearUnMarshalledState();
         this.object = null;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 97fc9e4..bb89378 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -153,8 +153,8 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
     // see https://issues.apache.org/activemq/browse/AMQ-2103
     // and https://issues.apache.org/activemq/browse/AMQ-2966
     @Override
-    public void clearMarshalledState() throws JMSException {
-        super.clearMarshalledState();
+    public void clearUnMarshalledState() throws JMSException {
+        super.clearUnMarshalledState();
         this.text = null;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-client/src/main/java/org/apache/activemq/command/Message.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/Message.java b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
index 83f3201..13e0030 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/Message.java
@@ -110,11 +110,24 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
     public abstract void storeContent();
     public abstract void storeContentAndClear();
 
-    // useful to reduce the memory footprint of a persisted message
+    /**
+     * @deprecated - This method name is misnamed
+     * @throws JMSException
+     */
     public void clearMarshalledState() throws JMSException {
+        clearUnMarshalledState();
+    }
+
+    // useful to reduce the memory footprint of a persisted message
+    public void clearUnMarshalledState() throws JMSException {
         properties = null;
     }
 
+    public boolean isMarshalled() {
+        return content != null && (marshalledProperties != null ||
+                (marshalledProperties == null && properties == null));
+    }
+
     protected void copy(Message copy) {
         super.copy(copy);
         copy.producerId = producerId;

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
index 8a952fd..ae84fca 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2103Test.java
@@ -21,7 +21,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.broker.BrokerTestSupport;
@@ -32,7 +32,16 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.usecases.MyObject;
+import org.junit.Ignore;
+
+import junit.framework.Test;
 
+
+/**
+ * AMQ-6477 changes the behavior to only clear memory if the marshalled state exists
+ * so this test no longer works
+ */
+@Ignore
 public class AMQ2103Test extends BrokerTestSupport {
     static PolicyEntry reduceMemoryFootprint = new PolicyEntry();
     static {
@@ -47,7 +56,7 @@ public class AMQ2103Test extends BrokerTestSupport {
     }
 
     public void initCombosForTestVerifyMarshalledStateIsCleared() throws Exception {
-        addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});    
+        addCombinationValues("defaultPolicy", new Object[]{defaultPolicy, null});
     }
 
     public static Test suite() {
@@ -60,6 +69,8 @@ public class AMQ2103Test extends BrokerTestSupport {
      * With vm transport and deferred serialisation and no persistence (mem persistence),
      * we see the message as sent by the client so we can validate the contents against
      * the policy
+     *
+     *
      * @throws Exception
      */
     public void testVerifyMarshalledStateIsCleared() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c3bb401/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java
new file mode 100644
index 0000000..02d9425
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6477Test.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.PrefetchSubscription;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that clearUnMarshalled data gets called properly to reduce memory usage
+ */
+@RunWith(Parameterized.class)
+public class AMQ6477Test {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ6477Test.class);
+
+    @Rule
+    public TemporaryFolder dataDir = new TemporaryFolder();
+
+    private BrokerService brokerService;
+    private String connectionUri;
+    private final ActiveMQQueue queue = new ActiveMQQueue("queue");
+    private final ActiveMQTopic topic = new ActiveMQTopic("topic");
+    private final int numMessages = 10;
+    private Connection connection;
+    private Session session;
+    private SubType subType;
+    private boolean persistent;
+
+    protected enum SubType {QUEUE, TOPIC, DURABLE};
+
+    @Parameters(name="subType={0},isPersistent={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {SubType.QUEUE, false},
+                {SubType.TOPIC, false},
+                {SubType.DURABLE, false},
+                {SubType.QUEUE, true},
+                {SubType.TOPIC, true},
+                {SubType.DURABLE, true}
+            });
+    }
+
+    /**
+     */
+    public AMQ6477Test(SubType subType, boolean persistent) {
+        super();
+        this.subType = subType;
+        this.persistent = persistent;
+    }
+
+    @Before
+    public void before() throws Exception {
+        brokerService = new BrokerService();
+        TransportConnector connector = brokerService.addConnector("tcp://localhost:0");
+        connectionUri = connector.getPublishableConnectString();
+        brokerService.setPersistent(persistent);
+        brokerService.setDataDirectory(dataDir.getRoot().getAbsolutePath());
+        brokerService.setUseJmx(false);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setReduceMemoryFootprint(true);
+
+        policyMap.setDefaultEntry(entry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        connection = factory.createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @After
+    public void after() throws Exception {
+        if (connection != null) {
+            connection.stop();
+        }
+        if (brokerService != null) {
+            brokerService.stop();
+            brokerService.waitUntilStopped();
+        }
+    }
+
+    @Test(timeout=30000)
+    public void testReduceMemoryFootprint() throws Exception {
+
+        ActiveMQDestination destination = subType.equals(SubType.QUEUE) ? queue : topic;
+
+        MessageConsumer consumer = subType.equals(SubType.DURABLE) ?
+                session.createDurableSubscriber(topic, "sub1") : session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < numMessages; i++) {
+            TextMessage m = session.createTextMessage("test");
+            m.setStringProperty("test", "test");
+            producer.send(m);
+        }
+
+        Subscription sub = brokerService.getDestination(destination).getConsumers().get(0);
+        List<MessageReference> messages = getSubscriptionMessages(sub);
+
+        //Go through each message and make sure the unmarshalled fields are null
+        //then call the getters which will unmarshall the data again to show the marshalled
+        //data exists
+        for (MessageReference ref : messages) {
+            ActiveMQTextMessage message = (ActiveMQTextMessage) ref.getMessage();
+            Field propertiesField = Message.class.getDeclaredField("properties");
+            propertiesField.setAccessible(true);
+            Field textField = ActiveMQTextMessage.class.getDeclaredField("text");
+            textField.setAccessible(true);
+
+            assertNull(textField.get(message));
+            assertNull(propertiesField.get(message));
+            assertNotNull(message.getProperties());
+            assertNotNull(message.getText());
+        }
+        consumer.close();
+    }
+
+
+    @SuppressWarnings("unchecked")
+    protected List<MessageReference> getSubscriptionMessages(Subscription sub) throws Exception {
+        Field f = null;
+
+        if (sub instanceof TopicSubscription) {
+            f = TopicSubscription.class.getDeclaredField("dispatched");
+        } else {
+            f = PrefetchSubscription.class.getDeclaredField("dispatched");
+        }
+        f.setAccessible(true);
+        return (List<MessageReference>) f.get(sub);
+    }
+
+}
\ No newline at end of file