You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/06/03 08:37:30 UTC

[activemq-artemis] branch main updated: ARTEMIS-3326 - fix state visibility between netty and actor thread after initial connection info processing. fix and test

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e77f93  ARTEMIS-3326 - fix state visibility between netty and actor thread after initial connection info processing. fix and test
0e77f93 is described below

commit 0e77f93f8322c1b1be719e5dc67f8f0d0c2edf1c
Author: gtully <ga...@gmail.com>
AuthorDate: Mon May 31 11:54:18 2021 +0100

    ARTEMIS-3326 - fix state visibility between netty and actor thread after initial connection info processing. fix and test
---
 .../core/protocol/openwire/OpenWireConnection.java |  25 +--
 .../protocol/openwire/OpenWireProtocolManager.java |   2 +-
 .../openwire/amq/OpenWireConnectionTest.java       | 167 +++++++++++++++++++++
 .../core/server/cluster/ClusterManager.java        |   2 +-
 4 files changed, 183 insertions(+), 13 deletions(-)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 1216a52..8807c7a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -167,7 +167,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
-   private ConnectionState state;
+   private volatile ConnectionState state;
 
    private volatile boolean noLocal;
 
@@ -194,7 +194,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    private ConnectionEntry connectionEntry;
    private boolean useKeepAlive;
    private long maxInactivityDuration;
-   private Actor<Command> openWireActor;
+   private volatile Actor<Command> openWireActor;
 
    private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
 
@@ -285,7 +285,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
             traceBufferReceived(connectionID, command);
          }
 
-         if (openWireActor != null) {
+         final Actor<Command> localVisibleActor = openWireActor;
+         if (localVisibleActor != null) {
             openWireActor.act(command);
          } else {
             act(command);
@@ -942,15 +943,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       this.useKeepAlive = useKeepAlive;
       this.maxInactivityDuration = inactivityDuration;
 
-      protocolManager.getScheduledPool().schedule(new Runnable() {
-         @Override
-         public void run() {
-            if (inactivityDuration >= 0) {
-               connectionEntry.ttl = inactivityDuration;
+      if (this.useKeepAlive) {
+         protocolManager.getScheduledPool().schedule(new Runnable() {
+            @Override
+            public void run() {
+               if (inactivityDuration >= 0) {
+                  connectionEntry.ttl = inactivityDuration;
+               }
             }
-         }
-      }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
-      checkInactivity();
+         }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
+         checkInactivity();
+      }
    }
 
    public void addKnownDestination(final SimpleString address) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 840a9a7..bcc39fe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -533,7 +533,7 @@ public class OpenWireProtocolManager  extends AbstractProtocolManager<Command, O
    public void configureInactivityParams(OpenWireConnection connection, WireFormatInfo command) throws IOException {
       long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration ? this.maxInactivityDuration : command.getMaxInactivityDuration();
       long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay() > this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay();
-      boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive;
+      boolean useKeepAliveToUse = inactivityDurationToUse == 0L ? false : this.useKeepAlive;
       connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse);
    }
 
diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
new file mode 100644
index 0000000..c64ecc1
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.security.SecurityStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertTrue;
+
+public class OpenWireConnectionTest {
+
+   @Test
+   public void testActorStateVisibility() throws Exception {
+
+      OrderedExecutorFactory orderedExecutorFactory = new OrderedExecutorFactory(Executors.newFixedThreadPool(5));
+      Executor orderedDelegate = orderedExecutorFactory.getExecutor();
+
+      ClusterManager clusterManager = Mockito.mock(ClusterManager.class);
+      ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
+      StorageManager storageManager = new NullStorageManager();
+      Mockito.when(server.getStorageManager()).thenReturn(storageManager);
+      Mockito.when(server.newOperationContext()).thenReturn(storageManager.newContext(orderedExecutorFactory.getExecutor()));
+      Mockito.when(server.getClusterManager()).thenReturn(clusterManager);
+      Mockito.when(clusterManager.getDefaultConnection(Mockito.any())).thenReturn(null);
+      SecurityStore securityStore = Mockito.mock(SecurityStore.class);
+      Mockito.when(server.getSecurityStore()).thenReturn(securityStore);
+      Mockito.when(securityStore.authenticate(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(null);
+      ServerSession serverSession = Mockito.mock(ServerSession.class);
+      Mockito.when(serverSession.getName()).thenReturn("session");
+      Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
+                                        Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString());
+
+      OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
+      openWireProtocolManager.setSecurityDomain("securityDomain");
+      openWireProtocolManager.setSupportAdvisory(false);
+
+      int commandId = 0;
+      WireFormatInfo wireFormatInfo = new WireFormatInfo();
+      wireFormatInfo.setVersion(OpenWireFormat.DEFAULT_WIRE_VERSION);
+      wireFormatInfo.setMaxFrameSize(1024);
+      wireFormatInfo.setMaxInactivityDuration(0); // disable keepalive to simplify
+      wireFormatInfo.setCommandId(commandId++);
+      OpenWireFormat wf = openWireProtocolManager.wireFormat();
+
+      ConnectionInfo connectionInfo = new ConnectionInfo(new ConnectionId("1:1"));
+      connectionInfo.setClientId("client1");
+      connectionInfo.setResponseRequired(true);
+      connectionInfo.setCommandId(commandId++);
+
+      SessionInfo sessionInfo = new SessionInfo(connectionInfo, 1);
+      sessionInfo.setResponseRequired(true);
+      sessionInfo.setCommandId(commandId++);
+
+      ProducerInfo producerInfo = new ProducerInfo(sessionInfo, 1);
+      producerInfo.setResponseRequired(true);
+      producerInfo.setCommandId(commandId++);
+
+      ByteSequence bytes = wf.marshal(wireFormatInfo);
+      final ActiveMQBuffer wireFormatInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+      wireFormatInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+      bytes = wf.marshal(connectionInfo);
+      final ActiveMQBuffer connectionInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+      connectionInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+      bytes = wf.marshal(sessionInfo);
+      final ActiveMQBuffer sessionInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+      sessionInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+      bytes = wf.marshal(producerInfo);
+      final ActiveMQBuffer producerInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+      producerInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+      RemoveInfo removeInfo = connectionInfo.createRemoveCommand();
+      removeInfo.setCommandId(commandId++);
+      removeInfo.setResponseRequired(true);
+      bytes = wf.marshal(removeInfo);
+      final ActiveMQBuffer removeInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+      removeInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+      Connection connection = Mockito.mock(Connection.class);
+      Mockito.doReturn(new ChannelBufferWrapper(Unpooled.buffer(1024))).when(connection).createTransportBuffer(Mockito.anyInt());
+
+      // in a loop do create connection/session/producer/remove to expose thread state sharing visibility
+      for (int i = 0; i < 5000; i++) {
+
+         final CountDownLatch okResponses = new CountDownLatch(3);
+         final CountDownLatch okResponsesWithRemove = new CountDownLatch(4);
+
+         OpenWireConnection openWireConnection = new OpenWireConnection(connection, server, openWireProtocolManager, wf, orderedDelegate) {
+            @Override
+            public void physicalSend(Command command) throws IOException {
+               if (command.isResponse()) {
+                  if (!((Response)command).isException()) {
+                     okResponses.countDown();
+                     okResponsesWithRemove.countDown();
+                  }
+               }
+            }
+         };
+
+         openWireConnection.bufferReceived(openWireConnection,  wireFormatInfoBuffer);
+         openWireConnection.bufferReceived(openWireConnection,  connectionInfoBuffer);
+         // actor tasks
+         openWireConnection.bufferReceived(openWireConnection,  sessionInfoBuffer);
+         openWireConnection.bufferReceived(openWireConnection,  producerInfoBuffer);
+
+         assertTrue("fail on ok response check, iteration: " + i, okResponses.await(10, TimeUnit.SECONDS));
+
+         openWireConnection.bufferReceived(openWireConnection,  removeInfoBuffer);
+
+         assertTrue("fail on ok response check with remove, iteration: " + i, okResponsesWithRemove.await(10, TimeUnit.SECONDS));
+
+         wireFormatInfoBuffer.resetReaderIndex();
+         connectionInfoBuffer.resetReaderIndex();
+         sessionInfoBuffer.resetReaderIndex();
+         producerInfoBuffer.resetReaderIndex();
+         removeInfoBuffer.resetReaderIndex();
+
+      }
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 78bc50d..6950120 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -75,7 +75,7 @@ import org.jboss.logging.Logger;
  * {@link ClusterConnectionImpl}. As a node is discovered a new {@link org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge} is
  * deployed.
  */
-public final class ClusterManager implements ActiveMQComponent {
+public class ClusterManager implements ActiveMQComponent {
 
    private static final Logger logger = Logger.getLogger(ClusterManager.class);