You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/21 14:36:33 UTC

[activemq-artemis] branch main updated: ARTEMIS-3729 Fix JMS CORE client commit after async sends

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d0c550b  ARTEMIS-3729 Fix JMS CORE client commit after async sends
d0c550b is described below

commit d0c550bcd781273df023c19490f31e21f12c2718
Author: Domenico Francesco Bruscino <br...@apache.org>
AuthorDate: Fri Mar 18 16:01:41 2022 +0100

    ARTEMIS-3729 Fix JMS CORE client commit after async sends
---
 .../core/protocol/core/CoreRemotingConnection.java |  5 ++
 .../protocol/core/impl/ActiveMQSessionContext.java | 10 ++-
 .../core/protocol/core/impl/ChannelImpl.java       |  7 +-
 .../core/protocol/core/impl/PacketDecoder.java     |  7 +-
 .../core/protocol/core/impl/PacketImpl.java        |  3 +
 .../impl/wireformat/NullResponseMessage_V2.java    |  7 +-
 ...essage_V2.java => SessionCommitMessage_V2.java} | 43 +++-------
 .../SessionSendContinuationMessage_V2.java         |  2 +
 .../wireformat/SessionXAResponseMessage_V2.java    |  2 +
 .../src/main/resources/activemq-version.properties |  2 +-
 .../protocol/core/ServerSessionPacketHandler.java  |  2 +-
 pom.xml                                            |  2 +-
 .../integration/client/JMSTransactionTest.java     | 94 ++++++++++++++++++++++
 13 files changed, 141 insertions(+), 45 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
index aa85590..88ba083 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java
@@ -61,6 +61,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
       return  version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
    }
 
+   default boolean isVersionSupportCommitV2() {
+      int version = getChannelVersion();
+      return  version >= PacketImpl.ARTEMIS_2_21_0_VERSION;
+   }
+
    /**
     * Sets the client protocol used on the communication. This will determine if the client has
     * support for certain packet types
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 811ebef..6013266 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -85,6 +85,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@@ -443,13 +445,17 @@ public class ActiveMQSessionContext extends SessionContext {
 
    @Override
    public void simpleCommit() throws ActiveMQException {
-      sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+      simpleCommit(true);
    }
 
    @Override
    public void simpleCommit(boolean block) throws ActiveMQException {
       if (block) {
-         sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+         if (!sessionChannel.getConnection().isVersionSupportCommitV2()) {
+            sessionChannel.sendBlocking(new SessionCommitMessage(), PacketImpl.NULL_RESPONSE);
+         } else {
+            sessionChannel.sendBlocking(new SessionCommitMessage_V2(), PacketImpl.NULL_RESPONSE);
+         }
       } else {
          sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
       }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index d133826..e0bb33b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -132,6 +133,8 @@ public final class ChannelImpl implements Channel {
 
    private final List<Interceptor> interceptors;
 
+   private final AtomicLong blockingCorrelationID = new AtomicLong(-1);
+
    public ChannelImpl(final CoreRemotingConnection connection,
                       final long id,
                       final int confWindowSize,
@@ -481,6 +484,8 @@ public final class ChannelImpl implements Channel {
       synchronized (sendBlockingLock) {
          packet.setChannelID(id);
 
+         packet.setCorrelationID(blockingCorrelationID.decrementAndGet());
+
          final ActiveMQBuffer buffer = packet.encode(connection);
 
          lock.lock();
@@ -508,7 +513,7 @@ public final class ChannelImpl implements Channel {
 
             long start = System.currentTimeMillis();
 
-            while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)) && toWait > 0) {
+            while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) {
                try {
                   sendCondition.await(toWait, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
index 640e079..2967c97 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java
@@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
@@ -252,7 +253,11 @@ public abstract class PacketDecoder implements Serializable {
             break;
          }
          case SESS_COMMIT: {
-            packet = new SessionCommitMessage();
+            if (!connection.isVersionSupportCommitV2()) {
+               packet = new SessionCommitMessage();
+            } else {
+               packet = new SessionCommitMessage_V2();
+            }
             break;
          }
          case SESS_ROLLBACK: {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 23e20fe..1d1471c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -41,6 +41,9 @@ public class PacketImpl implements Packet {
    // 2.18.0
    public static final int ARTEMIS_2_18_0_VERSION = 131;
 
+   // 2.21.0
+   public static final int ARTEMIS_2_21_0_VERSION = 132;
+
    public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
    public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
    public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
index b5d1132..7e47e0e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
@@ -53,6 +53,8 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
       super.decodeRest(buffer);
       if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
          correlationID = buffer.readLong();
+      } else {
+         correlationID = -1;
       }
    }
 
@@ -72,11 +74,6 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
    }
 
    @Override
-   protected String getPacketString() {
-      return super.getPacketString() + ", correlationID=" + correlationID;
-   }
-
-   @Override
    public int hashCode() {
       final int prime = 31;
       int result = super.hashCode();
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCommitMessage_V2.java
similarity index 76%
copy from artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
copy to artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCommitMessage_V2.java
index b5d1132..8bbe619 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCommitMessage_V2.java
@@ -19,19 +19,10 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.utils.DataConstants;
 
-public class NullResponseMessage_V2 extends NullResponseMessage {
+public class SessionCommitMessage_V2 extends SessionCommitMessage {
 
    private long correlationID;
 
-   public NullResponseMessage_V2(final long correlationID) {
-      super();
-      this.correlationID = correlationID;
-   }
-
-   public NullResponseMessage_V2() {
-      super();
-   }
-
    @Override
    public long getCorrelationID() {
       return correlationID;
@@ -43,6 +34,11 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
    }
 
    @Override
+   public boolean isResponseAsync() {
+      return true;
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeLong(correlationID);
@@ -53,6 +49,8 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
       super.decodeRest(buffer);
       if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
          correlationID = buffer.readLong();
+      } else {
+         correlationID = -1;
       }
    }
 
@@ -62,21 +60,6 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
    }
 
    @Override
-   public final boolean isResponse() {
-      return true;
-   }
-
-   @Override
-   public final boolean isResponseAsync() {
-      return true;
-   }
-
-   @Override
-   protected String getPacketString() {
-      return super.getPacketString() + ", correlationID=" + correlationID;
-   }
-
-   @Override
    public int hashCode() {
       final int prime = 31;
       int result = super.hashCode();
@@ -92,19 +75,13 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
       if (!super.equals(obj)) {
          return false;
       }
-      if (!(obj instanceof NullResponseMessage_V2)) {
+      if (!(obj instanceof SessionCommitMessage_V2)) {
          return false;
       }
-      NullResponseMessage_V2 other = (NullResponseMessage_V2) obj;
+      SessionCommitMessage_V2 other = (SessionCommitMessage_V2) obj;
       if (correlationID != other.correlationID) {
          return false;
       }
       return true;
    }
-
-   @Override
-   public void reset() {
-      super.reset();
-      correlationID = 0;
-   }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
index 9fd0968..1a6eb1d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java
@@ -64,6 +64,8 @@ public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMe
       super.decodeRest(buffer);
       if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
          correlationID = buffer.readLong();
+      } else {
+         correlationID = -1;
       }
    }
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
index c9bf965..9f466eb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java
@@ -48,6 +48,8 @@ public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
       super.decodeRest(buffer);
       if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
          correlationID = buffer.readLong();
+      } else {
+         correlationID = -1;
       }
    }
 
diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties
index e8ece76..92a1e34 100644
--- a/artemis-core-client/src/main/resources/activemq-version.properties
+++ b/artemis-core-client/src/main/resources/activemq-version.properties
@@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
 activemq.version.microVersion=${activemq.version.microVersion}
 activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
 activemq.version.versionTag=${activemq.version.versionTag}
-activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131
+activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 5993c7a..cdd0812 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -660,7 +660,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
    }
 
    private boolean requireNullResponseMessage_V1(Packet packet) {
-      return !packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange();
+      return channel.getConnection().isVersionBeforeAsyncResponseChange();
    }
 
    private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
diff --git a/pom.xml b/pom.xml
index 0b3de59..4ca5ea6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,7 +177,7 @@
       <activemq.version.majorVersion>1</activemq.version.majorVersion>
       <activemq.version.minorVersion>0</activemq.version.minorVersion>
       <activemq.version.microVersion>0</activemq.version.microVersion>
-      <activemq.version.incrementingVersion>131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
+      <activemq.version.incrementingVersion>132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
       <activemq.version.versionTag>${project.version}</activemq.version.versionTag>
       <ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
new file mode 100644
index 0000000..d79ea77
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.jms.CompletionListener;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class JMSTransactionTest extends JMSTestBase {
+
+   @Test(timeout = 60000)
+   public void testAsyncProduceMessageAndCommit() throws Throwable {
+      final String queueName = "TEST";
+      final int messages = 10;
+
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
+      cf.setConfirmationWindowSize(1000000);
+      cf.setBlockOnDurableSend(true);
+      cf.setBlockOnNonDurableSend(true);
+
+      CountDownLatch commitLatch = new CountDownLatch(1);
+      AtomicInteger sentMessages = new AtomicInteger(0);
+
+      server.getRemotingService().addIncomingInterceptor((Interceptor) (packet, connection1) -> {
+         if (packet.getType() == PacketImpl.SESS_COMMIT) {
+            commitLatch.countDown();
+         }
+         return true;
+      });
+
+      try (Connection connection = cf.createConnection();
+           Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
+
+         javax.jms.Queue queue = session.createQueue(queueName);
+         MessageProducer p = session.createProducer(queue);
+
+         for (int i = 0; i < messages; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText("Message:" + i);
+            p.send(message, new CompletionListener() {
+               @Override
+               public void onCompletion(Message message) {
+                  try {
+                     commitLatch.await();
+                     sentMessages.incrementAndGet();
+                  } catch (Exception e) {
+                     e.printStackTrace();
+                  }
+               }
+
+               @Override
+               public void onException(Message message, Exception exception) {
+
+               }
+            });
+         }
+
+         session.commit();
+         Assert.assertEquals(messages, sentMessages.get());
+
+         org.apache.activemq.artemis.core.server.Queue queueView = server.locateQueue(SimpleString.toSimpleString(queueName));
+         Wait.assertEquals(messages, queueView::getMessageCount);
+      }
+   }
+}