You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/06/03 08:37:30 UTC
[activemq-artemis] branch main updated: ARTEMIS-3326 - fix state
visibility between netty and actor thread after initial connection info
processing. fix and test
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 0e77f93 ARTEMIS-3326 - fix state visibility between netty and actor thread after initial connection info processing. fix and test
0e77f93 is described below
commit 0e77f93f8322c1b1be719e5dc67f8f0d0c2edf1c
Author: gtully <ga...@gmail.com>
AuthorDate: Mon May 31 11:54:18 2021 +0100
ARTEMIS-3326 - fix state visibility between netty and actor thread after initial connection info processing. fix and test
---
.../core/protocol/openwire/OpenWireConnection.java | 25 +--
.../protocol/openwire/OpenWireProtocolManager.java | 2 +-
.../openwire/amq/OpenWireConnectionTest.java | 167 +++++++++++++++++++++
.../core/server/cluster/ClusterManager.java | 2 +-
4 files changed, 183 insertions(+), 13 deletions(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 1216a52..8807c7a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -167,7 +167,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
- private ConnectionState state;
+ private volatile ConnectionState state;
private volatile boolean noLocal;
@@ -194,7 +194,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private ConnectionEntry connectionEntry;
private boolean useKeepAlive;
private long maxInactivityDuration;
- private Actor<Command> openWireActor;
+ private volatile Actor<Command> openWireActor;
private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
@@ -285,7 +285,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
traceBufferReceived(connectionID, command);
}
- if (openWireActor != null) {
+ final Actor<Command> localVisibleActor = openWireActor;
+ if (localVisibleActor != null) {
openWireActor.act(command);
} else {
act(command);
@@ -942,15 +943,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.useKeepAlive = useKeepAlive;
this.maxInactivityDuration = inactivityDuration;
- protocolManager.getScheduledPool().schedule(new Runnable() {
- @Override
- public void run() {
- if (inactivityDuration >= 0) {
- connectionEntry.ttl = inactivityDuration;
+ if (this.useKeepAlive) {
+ protocolManager.getScheduledPool().schedule(new Runnable() {
+ @Override
+ public void run() {
+ if (inactivityDuration >= 0) {
+ connectionEntry.ttl = inactivityDuration;
+ }
}
- }
- }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
- checkInactivity();
+ }, inactivityDurationInitialDelay, TimeUnit.MILLISECONDS);
+ checkInactivity();
+ }
}
public void addKnownDestination(final SimpleString address) {
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 840a9a7..bcc39fe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -533,7 +533,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
public void configureInactivityParams(OpenWireConnection connection, WireFormatInfo command) throws IOException {
long inactivityDurationToUse = command.getMaxInactivityDuration() > this.maxInactivityDuration ? this.maxInactivityDuration : command.getMaxInactivityDuration();
long inactivityDurationInitialDelayToUse = command.getMaxInactivityDurationInitalDelay() > this.maxInactivityDurationInitalDelay ? this.maxInactivityDurationInitalDelay : command.getMaxInactivityDurationInitalDelay();
- boolean useKeepAliveToUse = this.maxInactivityDuration == 0L ? false : this.useKeepAlive;
+ boolean useKeepAliveToUse = inactivityDurationToUse == 0L ? false : this.useKeepAlive;
connection.setUpTtl(inactivityDurationToUse, inactivityDurationInitialDelayToUse, useKeepAliveToUse);
}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
new file mode 100644
index 0000000..c64ecc1
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/amq/OpenWireConnectionTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire.amq;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.Unpooled;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.security.SecurityStore;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertTrue;
+
+public class OpenWireConnectionTest {
+
+ @Test
+ public void testActorStateVisibility() throws Exception {
+
+ OrderedExecutorFactory orderedExecutorFactory = new OrderedExecutorFactory(Executors.newFixedThreadPool(5));
+ Executor orderedDelegate = orderedExecutorFactory.getExecutor();
+
+ ClusterManager clusterManager = Mockito.mock(ClusterManager.class);
+ ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
+ StorageManager storageManager = new NullStorageManager();
+ Mockito.when(server.getStorageManager()).thenReturn(storageManager);
+ Mockito.when(server.newOperationContext()).thenReturn(storageManager.newContext(orderedExecutorFactory.getExecutor()));
+ Mockito.when(server.getClusterManager()).thenReturn(clusterManager);
+ Mockito.when(clusterManager.getDefaultConnection(Mockito.any())).thenReturn(null);
+ SecurityStore securityStore = Mockito.mock(SecurityStore.class);
+ Mockito.when(server.getSecurityStore()).thenReturn(securityStore);
+ Mockito.when(securityStore.authenticate(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(null);
+ ServerSession serverSession = Mockito.mock(ServerSession.class);
+ Mockito.when(serverSession.getName()).thenReturn("session");
+ Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
+ Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString());
+
+ OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
+ openWireProtocolManager.setSecurityDomain("securityDomain");
+ openWireProtocolManager.setSupportAdvisory(false);
+
+ int commandId = 0;
+ WireFormatInfo wireFormatInfo = new WireFormatInfo();
+ wireFormatInfo.setVersion(OpenWireFormat.DEFAULT_WIRE_VERSION);
+ wireFormatInfo.setMaxFrameSize(1024);
+ wireFormatInfo.setMaxInactivityDuration(0); // disable keepalive to simplify
+ wireFormatInfo.setCommandId(commandId++);
+ OpenWireFormat wf = openWireProtocolManager.wireFormat();
+
+ ConnectionInfo connectionInfo = new ConnectionInfo(new ConnectionId("1:1"));
+ connectionInfo.setClientId("client1");
+ connectionInfo.setResponseRequired(true);
+ connectionInfo.setCommandId(commandId++);
+
+ SessionInfo sessionInfo = new SessionInfo(connectionInfo, 1);
+ sessionInfo.setResponseRequired(true);
+ sessionInfo.setCommandId(commandId++);
+
+ ProducerInfo producerInfo = new ProducerInfo(sessionInfo, 1);
+ producerInfo.setResponseRequired(true);
+ producerInfo.setCommandId(commandId++);
+
+ ByteSequence bytes = wf.marshal(wireFormatInfo);
+ final ActiveMQBuffer wireFormatInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+ wireFormatInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+ bytes = wf.marshal(connectionInfo);
+ final ActiveMQBuffer connectionInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+ connectionInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+ bytes = wf.marshal(sessionInfo);
+ final ActiveMQBuffer sessionInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+ sessionInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+ bytes = wf.marshal(producerInfo);
+ final ActiveMQBuffer producerInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+ producerInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+ RemoveInfo removeInfo = connectionInfo.createRemoveCommand();
+ removeInfo.setCommandId(commandId++);
+ removeInfo.setResponseRequired(true);
+ bytes = wf.marshal(removeInfo);
+ final ActiveMQBuffer removeInfoBuffer = new ChannelBufferWrapper(Unpooled.buffer(bytes.length));
+ removeInfoBuffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+
+ Connection connection = Mockito.mock(Connection.class);
+ Mockito.doReturn(new ChannelBufferWrapper(Unpooled.buffer(1024))).when(connection).createTransportBuffer(Mockito.anyInt());
+
+ // in a loop do create connection/session/producer/remove to expose thread state sharing visibility
+ for (int i = 0; i < 5000; i++) {
+
+ final CountDownLatch okResponses = new CountDownLatch(3);
+ final CountDownLatch okResponsesWithRemove = new CountDownLatch(4);
+
+ OpenWireConnection openWireConnection = new OpenWireConnection(connection, server, openWireProtocolManager, wf, orderedDelegate) {
+ @Override
+ public void physicalSend(Command command) throws IOException {
+ if (command.isResponse()) {
+ if (!((Response)command).isException()) {
+ okResponses.countDown();
+ okResponsesWithRemove.countDown();
+ }
+ }
+ }
+ };
+
+ openWireConnection.bufferReceived(openWireConnection, wireFormatInfoBuffer);
+ openWireConnection.bufferReceived(openWireConnection, connectionInfoBuffer);
+ // actor tasks
+ openWireConnection.bufferReceived(openWireConnection, sessionInfoBuffer);
+ openWireConnection.bufferReceived(openWireConnection, producerInfoBuffer);
+
+ assertTrue("fail on ok response check, iteration: " + i, okResponses.await(10, TimeUnit.SECONDS));
+
+ openWireConnection.bufferReceived(openWireConnection, removeInfoBuffer);
+
+ assertTrue("fail on ok response check with remove, iteration: " + i, okResponsesWithRemove.await(10, TimeUnit.SECONDS));
+
+ wireFormatInfoBuffer.resetReaderIndex();
+ connectionInfoBuffer.resetReaderIndex();
+ sessionInfoBuffer.resetReaderIndex();
+ producerInfoBuffer.resetReaderIndex();
+ removeInfoBuffer.resetReaderIndex();
+
+ }
+ }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
index 78bc50d..6950120 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java
@@ -75,7 +75,7 @@ import org.jboss.logging.Logger;
* {@link ClusterConnectionImpl}. As a node is discovered a new {@link org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionBridge} is
* deployed.
*/
-public final class ClusterManager implements ActiveMQComponent {
+public class ClusterManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(ClusterManager.class);