You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/10/07 09:55:33 UTC
[activemq-artemis] branch main updated: ARTEMIS-3520: set the Open
failure hint when balancer redirects or refuses AMQP connection,
add lower level test for balancer related redirect/refusal protocol
behaviour
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new d7f37ae ARTEMIS-3520: set the Open failure hint when balancer redirects or refuses AMQP connection, add lower level test for balancer related redirect/refusal protocol behaviour
d7f37ae is described below
commit d7f37ae313bd3f26ed58ee08d2ca562c604b04b1
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Oct 6 18:01:59 2021 +0100
ARTEMIS-3520: set the Open failure hint when balancer redirects or refuses AMQP connection, add lower level test for balancer related redirect/refusal protocol behaviour
---
.../protocol/amqp/proton/AMQPRedirectHandler.java | 20 ++-
.../artemis/protocol/amqp/proton/AmqpSupport.java | 4 +-
.../integration/balancing/AmqpRedirectTest.java | 192 +++++++++++++++++++++
.../tests/integration/balancing/RedirectTest.java | 34 ++--
4 files changed, 228 insertions(+), 22 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
index d852a3a..bab287f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
@@ -44,7 +45,10 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.CONNECTION_FORCED);
error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
- context.getProtonConnection().setCondition(error);
+
+ Connection protonConnection = context.getProtonConnection();
+ protonConnection.setCondition(error);
+ addConnectionOpenFailureHint(protonConnection);
}
@Override
@@ -55,10 +59,20 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.REDIRECT);
error.setDescription(String.format("Connection redirected to %s:%d by broker balancer %s", host, port, context.getConnection().getTransportConnection().getRedirectTo()));
- Map info = new HashMap();
+ Map<Symbol, Object> info = new HashMap<>();
info.put(AmqpSupport.NETWORK_HOST, host);
info.put(AmqpSupport.PORT, port);
error.setInfo(info);
- context.getProtonConnection().setCondition(error);
+
+ Connection protonConnection = context.getProtonConnection();
+ protonConnection.setCondition(error);
+ addConnectionOpenFailureHint(protonConnection);
+ }
+
+ private void addConnectionOpenFailureHint(Connection connection) {
+ Map<Symbol, Object> connProps = new HashMap<>();
+ connProps.put(AmqpSupport.CONNECTION_OPEN_FAILED, true);
+
+ connection.setProperties(connProps);
}
}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 3da8f87..f6189d0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -68,8 +68,8 @@ public class AmqpSupport {
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
- static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
- static final Symbol PORT = Symbol.valueOf("port");
+ public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+ public static final Symbol PORT = Symbol.valueOf("port");
static final Symbol SCHEME = Symbol.valueOf("scheme");
static final Symbol HOSTNAME = Symbol.valueOf("hostname");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
new file mode 100644
index 0000000..e376848
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Connection;
+import org.junit.Test;
+
+/**
+ * Note: The primary balancer tests for AMQP clients are in e.g {@link RedirectTest} along with those for other protocols.
+ *
+ * This class only adds some additional validations that are AMQP-specific.
+ */
+public class AmqpRedirectTest extends BalancingTestBase {
+
+ @Test
+ public void testBalancerRejectionDueToOfflineTargetPool() throws Exception {
+ setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+ setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+
+ // Zero quorum size to avoid the quorum delay, given it will never be satisfied
+ setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 0, 1);
+
+ // Only start the balancer, so it can never become ready to redirect.
+ startServers(0);
+
+ URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
+ AmqpClient client = new AmqpClient(uri, "admin", "admin");
+
+ AmqpConnection connection = client.createConnection();
+ connection.setContainerId(getName());
+
+ connection.setStateInspector(new AmqpValidator() {
+
+ @Override
+ public void inspectOpenedResource(Connection connection) {
+ if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
+ markAsInvalid("Broker did not set connection establishment failed hint");
+ }
+ }
+
+ @Override
+ public void inspectClosedResource(Connection connection) {
+ ErrorCondition remoteError = connection.getRemoteCondition();
+ if (remoteError == null || remoteError.getCondition() == null) {
+ markAsInvalid("Broker did not add error condition for connection");
+ return;
+ }
+
+ if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) {
+ markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED);
+ return;
+ }
+
+ String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + " is not ready to redirect";
+ String actualDescription = remoteError.getDescription();
+ if (!expectedDescription.equals(actualDescription)) {
+ markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
+ return;
+ }
+ }
+ });
+
+ try {
+ connection.connect();
+ fail("Expected connection to fail, without redirect");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+
+ stopServers(0);
+ }
+
+ @Test
+ public void testBalancerRedirectDetails() throws Exception {
+ setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+ setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+
+ setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1);
+
+ startServers(0, 1);
+
+ URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
+ AmqpClient client = new AmqpClient(uri, "admin", "admin");
+
+ AmqpConnection connection = client.createConnection();
+ connection.setContainerId(getName());
+
+ connection.setStateInspector(new AmqpValidator() {
+
+ @Override
+ public void inspectOpenedResource(Connection connection) {
+ if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
+ markAsInvalid("Broker did not set connection establishment failed hint");
+ }
+ }
+
+ @Override
+ public void inspectClosedResource(Connection connection) {
+ ErrorCondition remoteError = connection.getRemoteCondition();
+ if (remoteError == null || remoteError.getCondition() == null) {
+ markAsInvalid("Broker did not add error condition for connection");
+ return;
+ }
+
+ if (!remoteError.getCondition().equals(ConnectionError.REDIRECT)) {
+ markAsInvalid("Broker did not set condition to " + ConnectionError.REDIRECT);
+ return;
+ }
+
+ Integer redirectPort = TransportConstants.DEFAULT_PORT + 1;
+
+ String expectedDescription = "Connection redirected to localhost:" + redirectPort + " by broker balancer " + BROKER_BALANCER_NAME;
+ String actualDescription = remoteError.getDescription();
+ if (!expectedDescription.equals(actualDescription)) {
+ markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
+ return;
+ }
+
+ // Validate the info map contains expected redirect info
+ Map<?, ?> infoMap = remoteError.getInfo();
+ if (infoMap == null) {
+ markAsInvalid("Broker did not set an info map on condition with redirect details");
+ return;
+ }
+
+ if (!infoMap.containsKey(AmqpSupport.NETWORK_HOST)) {
+ markAsInvalid("Info map does not contain key " + AmqpSupport.NETWORK_HOST);
+ return;
+ } else {
+ Object value = infoMap.get(AmqpSupport.NETWORK_HOST);
+ if (!"localhost".equals(value)) {
+ markAsInvalid("Info map does not contain expected network-host value, was: " + value);
+ return;
+ }
+ }
+
+ if (!infoMap.containsKey(AmqpSupport.PORT)) {
+ markAsInvalid("Info map does not contain key " + AmqpSupport.PORT);
+ return;
+ } else {
+ Object value = infoMap.get(AmqpSupport.PORT);
+ if (value == null || !redirectPort.equals(value)) {
+ markAsInvalid("Info map does not contain expected port value, was: " + value);
+ return;
+ }
+ }
+ }
+ });
+
+ try {
+ connection.connect();
+ fail("Expected connection to fail, with redirect");
+ } catch (Exception e) {
+ // Expected
+ }
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+
+ stopServers(0, 1);
+ }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
index 185c552..4c18af3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
@@ -183,7 +183,7 @@ public class RedirectTest extends BalancingTestBase {
queueControls[node] = (QueueControl)getServer(node).getManagementService()
.getResource(ResourceNames.QUEUE + queueName);
- Assert.assertEquals(0, queueControls[node].countMessages());
+ Assert.assertEquals("Unexpected messagecount for node " + node, 0, queueControls[node].countMessages());
}
@@ -229,7 +229,7 @@ public class RedirectTest extends BalancingTestBase {
}
for (int node : nodes) {
- Assert.assertEquals(0, queueControls[node].countMessages());
+ Assert.assertEquals("Unexpected message count for node " + node, 0, queueControls[node].countMessages());
}
stopServers(nodes);
@@ -266,8 +266,8 @@ public class RedirectTest extends BalancingTestBase {
QueueControl queueControl1 = (QueueControl)getServer(1).getManagementService()
.getResource(ResourceNames.QUEUE + queueName);
- Assert.assertEquals(0, queueControl0.countMessages());
- Assert.assertEquals(0, queueControl1.countMessages());
+ Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+ Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
ConnectionFactory connectionFactory0 = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
@@ -302,8 +302,8 @@ public class RedirectTest extends BalancingTestBase {
}
}
- Assert.assertEquals(0, queueControl0.countMessages());
- Assert.assertEquals(0, queueControl1.countMessages());
+ Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+ Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
stopServers(0, 1);
}
@@ -339,9 +339,9 @@ public class RedirectTest extends BalancingTestBase {
QueueControl queueControl2 = (QueueControl)getServer(2).getManagementService()
.getResource(ResourceNames.QUEUE + queueName);
- Assert.assertEquals(0, queueControl0.countMessages());
- Assert.assertEquals(0, queueControl1.countMessages());
- Assert.assertEquals(0, queueControl2.countMessages());
+ Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+ Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
+ Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages());
int failedNode;
ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
@@ -370,9 +370,9 @@ public class RedirectTest extends BalancingTestBase {
startServers(failedNode);
- Assert.assertEquals(0, queueControl0.countMessages());
- Assert.assertEquals(1, queueControl1.countMessages());
- Assert.assertEquals(1, queueControl2.countMessages());
+ Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+ Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages());
+ Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages());
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
@@ -385,13 +385,13 @@ public class RedirectTest extends BalancingTestBase {
}
}
- Assert.assertEquals(0, queueControl0.countMessages());
+ Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
if (failedNode == 1) {
- Assert.assertEquals(1, queueControl1.countMessages());
- Assert.assertEquals(0, queueControl2.countMessages());
+ Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages());
+ Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages());
} else {
- Assert.assertEquals(0, queueControl1.countMessages());
- Assert.assertEquals(1, queueControl2.countMessages());
+ Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
+ Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages());
}
stopServers(0, 1, 2);