You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/26 01:02:19 UTC

activemq-artemis git commit: ARTEMIS-2149 Protecting message.sendBuffer from races encoding it

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x b3ba6ecaf -> 5bd533a97


ARTEMIS-2149 Protecting message.sendBuffer from races encoding it

(cherry picked from commit 775e7d26039394978aa8dcd6729e27565f35b121)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5bd533a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5bd533a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5bd533a9

Branch: refs/heads/2.6.x
Commit: 5bd533a97444abffe78572f00f9d8c8d02ed5e56
Parents: b3ba6ec
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 25 17:27:47 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Oct 25 21:02:04 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/message/impl/CoreMessage.java  |   4 +-
 .../artemis/message/CoreMTMessageTest.java      | 124 +++++++++++++++++++
 2 files changed, 126 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5bd533a9/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index fa825e8..62a81a1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -294,7 +294,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
     * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
     */
    @Override
-   public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
+   public synchronized void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
       checkEncode();
       sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    }
@@ -303,7 +303,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
     * Recast the message as an 1.4 message
     */
    @Override
-   public void sendBuffer_1X(ByteBuf sendBuffer) {
+   public synchronized void sendBuffer_1X(ByteBuf sendBuffer) {
       checkEncode();
       ByteBuf tmpBuffer = buffer.duplicate();
       sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5bd533a9/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java
new file mode 100644
index 0000000..8553d20
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.message;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
+import org.apache.activemq.artemis.reader.TextMessageUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CoreMTMessageTest {
+
+   public static final SimpleString ADDRESS = new SimpleString("this.local.address");
+   public static final SimpleString ADDRESS2 = new SimpleString("some.other.address");
+   public static final byte MESSAGE_TYPE = Message.TEXT_TYPE;
+   public static final boolean DURABLE = true;
+   public static final long EXPIRATION = 123L;
+   public static final long TIMESTAMP = 321L;
+   public static final byte PRIORITY = (byte) 3;
+
+   @Test
+   public void testDecodeEncodeMultiThread() throws Exception {
+
+      for (int i = 0; i < 100; i++) {
+         internalTest();
+      }
+   }
+
+   public void internalTest() throws Exception {
+
+      CoreMessageObjectPools objectPools = new CoreMessageObjectPools();
+      SimpleString propValue = UUIDGenerator.getInstance().generateSimpleStringUUID();
+
+      UUID userID = UUIDGenerator.getInstance().generateUUID();
+      String body = UUIDGenerator.getInstance().generateStringUUID();
+      ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, objectPools);
+      TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
+
+      message.setAddress(ADDRESS);
+      message.setUserID(userID);
+      message.getProperties().putSimpleStringProperty(SimpleString.toSimpleString("str-prop"), propValue);
+
+      ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+      message.sendBuffer(buffer.byteBuf(), 0);
+
+
+      CoreMessage coreMessage = new CoreMessage(objectPools);
+      coreMessage.receiveBuffer(buffer.byteBuf());
+      coreMessage.setAddress(ADDRESS2.toString());
+      coreMessage.setMessageID(33);
+
+
+      Thread[] threads = new Thread[50];
+      final CountDownLatch aligned = new CountDownLatch(threads.length);
+      final CountDownLatch startFlag = new CountDownLatch(1);
+      final AtomicInteger errors = new AtomicInteger(0);
+
+      Runnable runnable = new Runnable() {
+         @Override
+         public void run() {
+            try {
+               ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+               aligned.countDown();
+               Assert.assertTrue(startFlag.await(5, TimeUnit.SECONDS));
+               coreMessage.messageChanged();
+               coreMessage.sendBuffer(buffer.byteBuf(), 0);
+               CoreMessage recMessage = new CoreMessage();
+               recMessage.receiveBuffer(buffer.byteBuf());
+               Assert.assertEquals(ADDRESS2, recMessage.getAddressSimpleString());
+               Assert.assertEquals(33, recMessage.getMessageID());
+               Assert.assertEquals(propValue, recMessage.getSimpleStringProperty(SimpleString.toSimpleString("str-prop")));
+            } catch (Throwable e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         }
+      };
+
+
+      for (int i = 0; i < threads.length; i++) {
+         threads[i] = new Thread(runnable);
+         threads[i].start();
+      }
+
+      aligned.await(10, TimeUnit.SECONDS);
+      coreMessage.messageChanged();
+      startFlag.countDown();
+
+      for (Thread t : threads) {
+         t.join(10000);
+         Assert.assertFalse(t.isAlive());
+      }
+
+      Assert.assertEquals(0, errors.get());
+
+   }
+
+}