You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/26 01:01:45 UTC
[1/2] activemq-artemis git commit: ARTEMIS-2149 Protecting
message.sendBuffer from races encoding it
Repository: activemq-artemis
Updated Branches:
refs/heads/master 76342e8e6 -> 9a885f142
ARTEMIS-2149 Protecting message.sendBuffer from races encoding it
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/775e7d26
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/775e7d26
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/775e7d26
Branch: refs/heads/master
Commit: 775e7d26039394978aa8dcd6729e27565f35b121
Parents: 76342e8
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 25 17:27:47 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Oct 25 20:50:38 2018 -0400
----------------------------------------------------------------------
.../artemis/core/message/impl/CoreMessage.java | 4 +-
.../artemis/message/CoreMTMessageTest.java | 124 +++++++++++++++++++
2 files changed, 126 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/775e7d26/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index fa825e8..62a81a1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -294,7 +294,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
*/
@Override
- public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
+ public synchronized void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
checkEncode();
sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
}
@@ -303,7 +303,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
* Recast the message as an 1.4 message
*/
@Override
- public void sendBuffer_1X(ByteBuf sendBuffer) {
+ public synchronized void sendBuffer_1X(ByteBuf sendBuffer) {
checkEncode();
ByteBuf tmpBuffer = buffer.duplicate();
sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/775e7d26/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java
new file mode 100644
index 0000000..8553d20
--- /dev/null
+++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/message/CoreMTMessageTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.message;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
+import org.apache.activemq.artemis.reader.TextMessageUtil;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CoreMTMessageTest {
+
+ public static final SimpleString ADDRESS = new SimpleString("this.local.address");
+ public static final SimpleString ADDRESS2 = new SimpleString("some.other.address");
+ public static final byte MESSAGE_TYPE = Message.TEXT_TYPE;
+ public static final boolean DURABLE = true;
+ public static final long EXPIRATION = 123L;
+ public static final long TIMESTAMP = 321L;
+ public static final byte PRIORITY = (byte) 3;
+
+ @Test
+ public void testDecodeEncodeMultiThread() throws Exception {
+
+ for (int i = 0; i < 100; i++) {
+ internalTest();
+ }
+ }
+
+ public void internalTest() throws Exception {
+
+ CoreMessageObjectPools objectPools = new CoreMessageObjectPools();
+ SimpleString propValue = UUIDGenerator.getInstance().generateSimpleStringUUID();
+
+ UUID userID = UUIDGenerator.getInstance().generateUUID();
+ String body = UUIDGenerator.getInstance().generateStringUUID();
+ ClientMessageImpl message = new ClientMessageImpl(MESSAGE_TYPE, DURABLE, EXPIRATION, TIMESTAMP, PRIORITY, 10 * 1024, objectPools);
+ TextMessageUtil.writeBodyText(message.getBodyBuffer(), SimpleString.toSimpleString(body));
+
+ message.setAddress(ADDRESS);
+ message.setUserID(userID);
+ message.getProperties().putSimpleStringProperty(SimpleString.toSimpleString("str-prop"), propValue);
+
+ ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+ message.sendBuffer(buffer.byteBuf(), 0);
+
+
+ CoreMessage coreMessage = new CoreMessage(objectPools);
+ coreMessage.receiveBuffer(buffer.byteBuf());
+ coreMessage.setAddress(ADDRESS2.toString());
+ coreMessage.setMessageID(33);
+
+
+ Thread[] threads = new Thread[50];
+ final CountDownLatch aligned = new CountDownLatch(threads.length);
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(10 * 1024);
+ aligned.countDown();
+ Assert.assertTrue(startFlag.await(5, TimeUnit.SECONDS));
+ coreMessage.messageChanged();
+ coreMessage.sendBuffer(buffer.byteBuf(), 0);
+ CoreMessage recMessage = new CoreMessage();
+ recMessage.receiveBuffer(buffer.byteBuf());
+ Assert.assertEquals(ADDRESS2, recMessage.getAddressSimpleString());
+ Assert.assertEquals(33, recMessage.getMessageID());
+ Assert.assertEquals(propValue, recMessage.getSimpleStringProperty(SimpleString.toSimpleString("str-prop")));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ };
+
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new Thread(runnable);
+ threads[i].start();
+ }
+
+ aligned.await(10, TimeUnit.SECONDS);
+ coreMessage.messageChanged();
+ startFlag.countDown();
+
+ for (Thread t : threads) {
+ t.join(10000);
+ Assert.assertFalse(t.isAlive());
+ }
+
+ Assert.assertEquals(0, errors.get());
+
+ }
+
+}
[2/2] activemq-artemis git commit: This closes #2396
Posted by cl...@apache.org.
This closes #2396
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a885f14
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a885f14
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a885f14
Branch: refs/heads/master
Commit: 9a885f142553f67fa54733349aace40606d80a0c
Parents: 76342e8 775e7d2
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 25 21:00:36 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Oct 25 21:00:36 2018 -0400
----------------------------------------------------------------------
.../artemis/core/message/impl/CoreMessage.java | 4 +-
.../artemis/message/CoreMTMessageTest.java | 124 +++++++++++++++++++
2 files changed, 126 insertions(+), 2 deletions(-)
----------------------------------------------------------------------