You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/03/22 19:57:14 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6221

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 93bc7030e -> 7f5c09f2d


https://issues.apache.org/jira/browse/AMQ-6221

Synchronizing ActiveMQText message on state changes for the content and
text fields so that they are always changed together.  This will prevent
race conditions where data can be lost when using concurrent store and
dispatch.

(cherry picked from commit e0c549996479c2a1ccf70029ad4462cb987650f6)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c2a825e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c2a825e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c2a825e

Branch: refs/heads/activemq-5.13.x
Commit: 6c2a825ebb516e9b37cdb913800de0306ed7e670
Parents: 93bc703
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Mar 22 13:41:20 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Mar 22 18:51:12 2016 +0000

----------------------------------------------------------------------
 .../activemq/command/ActiveMQTextMessage.java   | 99 ++++++++++++++------
 1 file changed, 71 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6c2a825e/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
index 4618341..c9345b1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQTextMessage.java
@@ -45,7 +45,7 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.ACTIVEMQ_TEXT_MESSAGE;
 
-    protected String text;
+    protected volatile String text;
 
     @Override
     public Message copy() {
@@ -55,14 +55,10 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
     }
 
     private void copy(ActiveMQTextMessage copy) {
-        //AMQ-6218 - Save text before calling super.copy() to prevent a race condition when
-        //concurrent store and dispatch is enabled in KahaDB
-        //The issue is sometimes beforeMarshall() gets called in between the time content and
-        //text are copied to the new object leading to both fields being null when text should
-        //not be null
-        String text = this.text;
-        super.copy(copy);
-        copy.text = text;
+        synchronized(this) {
+            super.copy(copy);
+            copy.text = text;
+        }
     }
 
     @Override
@@ -77,21 +73,30 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
 
     @Override
     public void setText(String text) throws MessageNotWriteableException {
-        checkReadOnlyBody();
-        this.text = text;
-        setContent(null);
+        synchronized(this) {
+            checkReadOnlyBody();
+            this.text = text;
+            setContent(null);
+        }
     }
 
     @Override
     public String getText() throws JMSException {
-        ByteSequence content = getContent();
-        String text = this.text;
+        ByteSequence content;
+        String text;
+
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
 
         if (text == null && content != null) {
             text = decodeContent(content);
-            this.text = text;
-            setContent(null);
-            setCompressed(false);
+            synchronized(this) {
+                this.text = text;
+                setContent(null);
+                setCompressed(false);
+            }
         }
         return text;
     }
@@ -131,16 +136,43 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
 
     @Override
     public void storeContentAndClear() {
-        storeContent();
-        text=null;
+        ByteSequence content;
+        String text;
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
+        if (content == null && text != null) {
+            content = marshallContent(text);
+        }
+        synchronized(this) {
+            setContent(content);
+            text=null;
+        }
     }
 
     @Override
     public void storeContent() {
+        ByteSequence content;
+        String text;
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
+
+        if (content == null && text != null) {
+            content = marshallContent(text);
+        }
+
+        synchronized(this) {
+            setContent(content);
+        }
+    }
+
+    private ByteSequence marshallContent(String text) {
+        ByteSequence content = null;
         try {
-            ByteSequence content = getContent();
-            String text = this.text;
-            if (content == null && text != null) {
+            if (text != null) {
                 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
                 OutputStream os = bytesOut;
                 ActiveMQConnection connection = getConnection();
@@ -151,19 +183,23 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
                 DataOutputStream dataOut = new DataOutputStream(os);
                 MarshallingSupport.writeUTF8(dataOut, text);
                 dataOut.close();
-                setContent(bytesOut.toByteSequence());
+                content = bytesOut.toByteSequence();
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+        return content;
     }
 
+
     // see https://issues.apache.org/activemq/browse/AMQ-2103
     // and https://issues.apache.org/activemq/browse/AMQ-2966
     @Override
     public void clearMarshalledState() throws JMSException {
-        super.clearMarshalledState();
-        this.text = null;
+        synchronized(this) {
+            super.clearMarshalledState();
+            this.text = null;
+        }
     }
 
     /**
@@ -179,13 +215,20 @@ public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage
      */
     @Override
     public void clearBody() throws JMSException {
-        super.clearBody();
-        this.text = null;
+        synchronized(this) {
+            super.clearBody();
+            this.text = null;
+        }
     }
 
     @Override
     public int getSize() {
-        String text = this.text;
+        ByteSequence content;
+        String text;
+        synchronized(this) {
+            content = getContent();
+            text = this.text;
+        }
         if (size == 0 && content == null && text != null) {
             size = getMinimumMessageSize();
             if (marshalledProperties != null) {


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6222

Posted by cs...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6222

Moving clearedMarshalledState execution to the async listener on an
async add to the message store.  This is necessary to make sure this
logic doens't execute until after the message is marshalled for the
store.

(cherry picked from commit 75990ef14a092b629bf8d2127bc4786e51b31684)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7f5c09f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7f5c09f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7f5c09f2

Branch: refs/heads/activemq-5.13.x
Commit: 7f5c09f2d77c77f87a47fb738870c5ee5bc78c27
Parents: 6c2a825
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Tue Mar 22 17:37:56 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Tue Mar 22 18:55:57 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  22 +-
 .../org/apache/activemq/bugs/AMQ6222Test.java   | 312 +++++++++++++++++++
 2 files changed, 330 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7f5c09f2/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index d59c1d8..c8c7c33 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -837,12 +837,26 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                     try {
                         if (messages.isCacheEnabled()) {
                             result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());
-                            result.addListener(new PendingMarshalUsageTracker(message));
+                            final PendingMarshalUsageTracker tracker = new PendingMarshalUsageTracker(message);
+                            result.addListener(new Runnable() {
+                                @Override
+                                public void run() {
+                                    //Execute usage tracker and then check isReduceMemoryFootprint()
+                                    tracker.run();
+                                    if (isReduceMemoryFootprint()) {
+                                        try {
+                                            message.clearMarshalledState();
+                                        } catch (JMSException e) {
+                                            throw new IllegalStateException(e);
+                                        }
+                                    }
+                                }
+                            });
                         } else {
                             store.addMessage(context, message);
-                        }
-                        if (isReduceMemoryFootprint()) {
-                            message.clearMarshalledState();
+                            if (isReduceMemoryFootprint()) {
+                                message.clearMarshalledState();
+                            }
                         }
                     } catch (Exception e) {
                         // we may have a store in inconsistent state, so reset the cursor

http://git-wip-us.apache.org/repos/asf/activemq/blob/7f5c09f2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java
new file mode 100644
index 0000000..512ef9d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6222Test.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test can also be used to debug AMQ-6218 and AMQ-6221
+ *
+ * This test shows that messages are received with non-null data while
+ * several consumers are used.
+ */
+@RunWith(Parameterized.class)
+public class AMQ6222Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6222Test.class);
+
+    private final MessageType messageType;
+    private final boolean reduceMemoryFootPrint;
+    private final boolean concurrentDispatch;
+
+    private static enum MessageType {TEXT, MAP, OBJECT}
+    private final static boolean[] booleanVals = {true, false};
+    private static boolean[] reduceMemoryFootPrintVals = booleanVals;
+    private static boolean[] concurrentDispatchVals = booleanVals;
+
+    @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; ConcurrentDispatch:{2}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+
+        for (MessageType mt : MessageType.values()) {
+            for (boolean rmfVal : reduceMemoryFootPrintVals) {
+                for (boolean cdVal : concurrentDispatchVals) {
+                    values.add(new Object[] {mt, rmfVal, cdVal});
+                }
+            }
+        }
+
+        return values;
+    }
+
+    public AMQ6222Test(MessageType messageType, boolean reduceMemoryFootPrint,
+            boolean concurrentDispatch) {
+        this.messageType = messageType;
+        this.reduceMemoryFootPrint = reduceMemoryFootPrint;
+        this.concurrentDispatch = concurrentDispatch;
+    }
+
+    private BrokerService broker;
+    private final AtomicBoolean failure = new AtomicBoolean();
+    private CountDownLatch ready;
+    private URI connectionURI;
+    private URI vmConnectionURI;
+
+    private final boolean USE_VM_TRANSPORT = true;
+
+    private final int NUM_CONSUMERS = 30;
+    private final int NUM_PRODUCERS = 1;
+    private final int NUM_TASKS = NUM_CONSUMERS + NUM_PRODUCERS;
+
+    private int i = 0;
+    private String MessageId = null;
+    private int MessageCount = 0;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
+        broker.setDeleteAllMessagesOnStartup(true);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setReduceMemoryFootprint(reduceMemoryFootPrint);
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+        KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+        broker.start();
+        broker.waitUntilStarted();
+
+        ready = new CountDownLatch(NUM_TASKS);
+        connectionURI = connector.getPublishableConnectURI();
+        vmConnectionURI = broker.getVmConnectorURI();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Test(timeout=180000)
+    public void testMessagesAreValid() throws Exception {
+
+        ExecutorService tasks = Executors.newFixedThreadPool(NUM_TASKS);
+        for (int i = 0; i < NUM_CONSUMERS; i++) {
+            LOG.info("Created Consumer: {}", i + 1);
+            tasks.execute(new HelloWorldConsumer());
+        }
+
+        for (int i = 0; i < NUM_PRODUCERS; i++) {
+            LOG.info("Created Producer: {}", i + 1);
+            tasks.execute(new HelloWorldProducer());
+        }
+
+        assertTrue(ready.await(20, TimeUnit.SECONDS));
+
+        try {
+            tasks.shutdown();
+            tasks.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            //should get exception with no errors
+        }
+
+        assertFalse("Test Encountered a null bodied message", failure.get());
+    }
+
+    public URI getBrokerURI() {
+        if (USE_VM_TRANSPORT) {
+            return vmConnectionURI;
+        } else {
+            return connectionURI;
+        }
+    }
+
+    public class HelloWorldProducer implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                ActiveMQConnectionFactory connectionFactory =
+                    new ActiveMQConnectionFactory(getBrokerURI());
+
+                Connection connection = connectionFactory.createConnection();
+                connection.start();
+
+                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+                Destination destination = session.createTopic("VirtualTopic.AMQ6218Test");
+
+                MessageProducer producer = session.createProducer(destination);
+
+                LOG.info("Producer: {}", destination);
+
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                producer.setPriority(4);
+                producer.setTimeToLive(0);
+
+                ready.countDown();
+
+                int j = 0;
+                while (!failure.get()) {
+                    j++;
+                    String text = "AMQ Message Number :" + j;
+                    Message message = null;
+                    if (messageType.equals(MessageType.MAP)) {
+                        MapMessage mapMessage = session.createMapMessage();
+                        mapMessage.setString("text", text);
+                        message = mapMessage;
+                    } else if (messageType.equals(MessageType.OBJECT)) {
+                        ObjectMessage objectMessage = session.createObjectMessage();
+                        objectMessage.setObject(text);
+                        message = objectMessage;
+                    } else {
+                        message = session.createTextMessage(text);
+                    }
+                    producer.send(message);
+                    LOG.info("Sent message: {}", message.getJMSMessageID());
+                }
+
+                connection.close();
+            } catch (Exception e) {
+                LOG.error("Caught: " + e);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public class HelloWorldConsumer implements Runnable, ExceptionListener {
+        String queueName;
+
+        @Override
+        public void run() {
+            try {
+
+                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(getBrokerURI());
+                Connection connection = connectionFactory.createConnection();
+                connection.start();
+
+                Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+                synchronized (this) {
+                    queueName = "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test";
+                    i++;
+                    LOG.info(queueName);
+                }
+
+                Destination destination = session.createQueue(queueName);
+                MessageConsumer consumer = session.createConsumer(destination);
+
+                ready.countDown();
+
+                while (!failure.get()) {
+
+                    Message message = consumer.receive(500);
+
+                    if (message != null) {
+                        synchronized (this) {
+                            if (MessageId != null) {
+                                if (message.getJMSMessageID().equalsIgnoreCase(MessageId)) {
+                                    MessageCount++;
+                                } else {
+                                    LOG.info("Count of message " + MessageId + " is " + MessageCount);
+                                    MessageCount = 1;
+                                    MessageId = message.getJMSMessageID();
+                                }
+                            } else {
+                                MessageId = message.getJMSMessageID();
+                                MessageCount = 1;
+                            }
+                        }
+
+                        String text = null;
+                        if (messageType.equals(MessageType.OBJECT) && message instanceof ObjectMessage) {
+                            ObjectMessage objectMessage = (ObjectMessage) message;
+                            text = (String) objectMessage.getObject();
+                        } else if (messageType.equals(MessageType.TEXT) && message instanceof TextMessage) {
+                            TextMessage textMessage = (TextMessage) message;
+                            text = textMessage.getText();
+                        } else if (messageType.equals(MessageType.MAP) && message instanceof MapMessage) {
+                            MapMessage mapMessage = (MapMessage) message;
+                            text = mapMessage.getString("text");
+                        } else {
+                            LOG.info(queueName + " Message is not a instanceof " + messageType + " message id: " + message.getJMSMessageID() + message);
+                        }
+
+                        if (text == null) {
+                            LOG.warn(queueName + " text received as a null " + message);
+                            failure.set(true);
+                        } else {
+                            LOG.info(queueName + " text " + text + " message id: " + message.getJMSMessageID());
+                        }
+
+                        message.acknowledge();
+                    }
+                }
+
+                connection.close();
+            } catch (Exception e) {
+                LOG.error("Caught: ", e);
+            }
+        }
+
+        @Override
+        public synchronized void onException(JMSException ex) {
+            LOG.error("JMS Exception occurred.  Shutting down client.");
+        }
+    }
+}