You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/10/07 09:55:33 UTC

[activemq-artemis] branch main updated: ARTEMIS-3520: set the Open failure hint when balancer redirects or refuses AMQP connection, add lower level test for balancer related redirect/refusal protocol behaviour

This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d7f37ae  ARTEMIS-3520: set the Open failure hint when balancer redirects or refuses AMQP connection, add lower level test for balancer related redirect/refusal protocol behaviour
d7f37ae is described below

commit d7f37ae313bd3f26ed58ee08d2ca562c604b04b1
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Oct 6 18:01:59 2021 +0100

    ARTEMIS-3520: set the Open failure hint when balancer redirects or refuses AMQP connection, add lower level test for balancer related redirect/refusal protocol behaviour
---
 .../protocol/amqp/proton/AMQPRedirectHandler.java  |  20 ++-
 .../artemis/protocol/amqp/proton/AmqpSupport.java  |   4 +-
 .../integration/balancing/AmqpRedirectTest.java    | 192 +++++++++++++++++++++
 .../tests/integration/balancing/RedirectTest.java  |  34 ++--
 4 files changed, 228 insertions(+), 22 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
index d852a3a..bab287f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java
@@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.ConnectionError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Connection;
@@ -44,7 +45,10 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
       ErrorCondition error = new ErrorCondition();
       error.setCondition(ConnectionError.CONNECTION_FORCED);
       error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
-      context.getProtonConnection().setCondition(error);
+
+      Connection protonConnection = context.getProtonConnection();
+      protonConnection.setCondition(error);
+      addConnectionOpenFailureHint(protonConnection);
    }
 
    @Override
@@ -55,10 +59,20 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
       ErrorCondition error = new ErrorCondition();
       error.setCondition(ConnectionError.REDIRECT);
       error.setDescription(String.format("Connection redirected to %s:%d by broker balancer %s", host, port, context.getConnection().getTransportConnection().getRedirectTo()));
-      Map info = new HashMap();
+      Map<Symbol, Object>  info = new HashMap<>();
       info.put(AmqpSupport.NETWORK_HOST, host);
       info.put(AmqpSupport.PORT, port);
       error.setInfo(info);
-      context.getProtonConnection().setCondition(error);
+
+      Connection protonConnection = context.getProtonConnection();
+      protonConnection.setCondition(error);
+      addConnectionOpenFailureHint(protonConnection);
+   }
+
+   private void addConnectionOpenFailureHint(Connection connection) {
+      Map<Symbol, Object> connProps = new HashMap<>();
+      connProps.put(AmqpSupport.CONNECTION_OPEN_FAILED, true);
+
+      connection.setProperties(connProps);
    }
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 3da8f87..f6189d0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -68,8 +68,8 @@ public class AmqpSupport {
    public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
    public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
    public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
-   static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
-   static final Symbol PORT = Symbol.valueOf("port");
+   public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+   public static final Symbol PORT = Symbol.valueOf("port");
    static final Symbol SCHEME = Symbol.valueOf("scheme");
    static final Symbol HOSTNAME = Symbol.valueOf("hostname");
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
new file mode 100644
index 0000000..e376848
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.balancing;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
+import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Connection;
+import org.junit.Test;
+
+/**
+ * Note: The primary balancer tests for AMQP clients are in e.g {@link RedirectTest} along with those for other protocols.
+ *
+ * This class only adds some additional validations that are AMQP-specific.
+ */
+public class AmqpRedirectTest extends BalancingTestBase {
+
+   @Test
+   public void testBalancerRejectionDueToOfflineTargetPool() throws Exception {
+      setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+      setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+
+      // Zero quorum size to avoid the quorum delay, given it will never be satisfied
+      setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 0, 1);
+
+      // Only start the balancer, so it can never become ready to redirect.
+      startServers(0);
+
+      URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
+      AmqpClient client = new AmqpClient(uri, "admin", "admin");
+
+      AmqpConnection connection = client.createConnection();
+      connection.setContainerId(getName());
+
+      connection.setStateInspector(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
+               markAsInvalid("Broker did not set connection establishment failed hint");
+            }
+         }
+
+         @Override
+         public void inspectClosedResource(Connection connection) {
+            ErrorCondition remoteError = connection.getRemoteCondition();
+            if (remoteError == null || remoteError.getCondition() == null) {
+               markAsInvalid("Broker did not add error condition for connection");
+               return;
+            }
+
+            if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) {
+               markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED);
+               return;
+            }
+
+            String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + " is not ready to redirect";
+            String actualDescription = remoteError.getDescription();
+            if (!expectedDescription.equals(actualDescription)) {
+               markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
+               return;
+            }
+         }
+      });
+
+      try {
+         connection.connect();
+         fail("Expected connection to fail, without redirect");
+      } catch (Exception e) {
+         // Expected
+      }
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+
+      stopServers(0);
+   }
+
+   @Test
+   public void testBalancerRedirectDetails() throws Exception {
+      setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+      setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
+
+      setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1);
+
+      startServers(0, 1);
+
+      URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
+      AmqpClient client = new AmqpClient(uri, "admin", "admin");
+
+      AmqpConnection connection = client.createConnection();
+      connection.setContainerId(getName());
+
+      connection.setStateInspector(new AmqpValidator() {
+
+         @Override
+         public void inspectOpenedResource(Connection connection) {
+            if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
+               markAsInvalid("Broker did not set connection establishment failed hint");
+            }
+         }
+
+         @Override
+         public void inspectClosedResource(Connection connection) {
+            ErrorCondition remoteError = connection.getRemoteCondition();
+            if (remoteError == null || remoteError.getCondition() == null) {
+               markAsInvalid("Broker did not add error condition for connection");
+               return;
+            }
+
+            if (!remoteError.getCondition().equals(ConnectionError.REDIRECT)) {
+               markAsInvalid("Broker did not set condition to " + ConnectionError.REDIRECT);
+               return;
+            }
+
+            Integer redirectPort = TransportConstants.DEFAULT_PORT + 1;
+
+            String expectedDescription = "Connection redirected to localhost:" + redirectPort + " by broker balancer " + BROKER_BALANCER_NAME;
+            String actualDescription = remoteError.getDescription();
+            if (!expectedDescription.equals(actualDescription)) {
+               markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
+               return;
+            }
+
+            // Validate the info map contains expected redirect info
+            Map<?, ?> infoMap = remoteError.getInfo();
+            if (infoMap == null) {
+               markAsInvalid("Broker did not set an info map on condition with redirect details");
+               return;
+            }
+
+            if (!infoMap.containsKey(AmqpSupport.NETWORK_HOST)) {
+               markAsInvalid("Info map does not contain key " + AmqpSupport.NETWORK_HOST);
+               return;
+            } else {
+               Object value = infoMap.get(AmqpSupport.NETWORK_HOST);
+               if (!"localhost".equals(value)) {
+                  markAsInvalid("Info map does not contain expected network-host value, was: " + value);
+                  return;
+               }
+            }
+
+            if (!infoMap.containsKey(AmqpSupport.PORT)) {
+               markAsInvalid("Info map does not contain key " + AmqpSupport.PORT);
+               return;
+            } else {
+               Object value = infoMap.get(AmqpSupport.PORT);
+               if (value == null || !redirectPort.equals(value)) {
+                  markAsInvalid("Info map does not contain expected port value, was: " + value);
+                  return;
+               }
+            }
+         }
+      });
+
+      try {
+         connection.connect();
+         fail("Expected connection to fail, with redirect");
+      } catch (Exception e) {
+         // Expected
+      }
+
+      connection.getStateInspector().assertValid();
+      connection.close();
+
+      stopServers(0, 1);
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
index 185c552..4c18af3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java
@@ -183,7 +183,7 @@ public class RedirectTest extends BalancingTestBase {
          queueControls[node] = (QueueControl)getServer(node).getManagementService()
             .getResource(ResourceNames.QUEUE + queueName);
 
-         Assert.assertEquals(0, queueControls[node].countMessages());
+         Assert.assertEquals("Unexpected messagecount for node " + node, 0, queueControls[node].countMessages());
       }
 
 
@@ -229,7 +229,7 @@ public class RedirectTest extends BalancingTestBase {
       }
 
       for (int node : nodes) {
-         Assert.assertEquals(0, queueControls[node].countMessages());
+         Assert.assertEquals("Unexpected message count for node " + node, 0, queueControls[node].countMessages());
       }
 
       stopServers(nodes);
@@ -266,8 +266,8 @@ public class RedirectTest extends BalancingTestBase {
       QueueControl queueControl1 = (QueueControl)getServer(1).getManagementService()
          .getResource(ResourceNames.QUEUE + queueName);
 
-      Assert.assertEquals(0, queueControl0.countMessages());
-      Assert.assertEquals(0, queueControl1.countMessages());
+      Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+      Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
 
       ConnectionFactory connectionFactory0 = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
          TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
@@ -302,8 +302,8 @@ public class RedirectTest extends BalancingTestBase {
          }
       }
 
-      Assert.assertEquals(0, queueControl0.countMessages());
-      Assert.assertEquals(0, queueControl1.countMessages());
+      Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+      Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
 
       stopServers(0, 1);
    }
@@ -339,9 +339,9 @@ public class RedirectTest extends BalancingTestBase {
       QueueControl queueControl2 = (QueueControl)getServer(2).getManagementService()
          .getResource(ResourceNames.QUEUE + queueName);
 
-      Assert.assertEquals(0, queueControl0.countMessages());
-      Assert.assertEquals(0, queueControl1.countMessages());
-      Assert.assertEquals(0, queueControl2.countMessages());
+      Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+      Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
+      Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages());
 
       int failedNode;
       ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
@@ -370,9 +370,9 @@ public class RedirectTest extends BalancingTestBase {
 
       startServers(failedNode);
 
-      Assert.assertEquals(0, queueControl0.countMessages());
-      Assert.assertEquals(1, queueControl1.countMessages());
-      Assert.assertEquals(1, queueControl2.countMessages());
+      Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
+      Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages());
+      Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages());
 
       try (Connection connection = connectionFactory.createConnection()) {
          connection.start();
@@ -385,13 +385,13 @@ public class RedirectTest extends BalancingTestBase {
          }
       }
 
-      Assert.assertEquals(0, queueControl0.countMessages());
+      Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
       if (failedNode == 1) {
-         Assert.assertEquals(1, queueControl1.countMessages());
-         Assert.assertEquals(0, queueControl2.countMessages());
+         Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages());
+         Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages());
       } else {
-         Assert.assertEquals(0, queueControl1.countMessages());
-         Assert.assertEquals(1, queueControl2.countMessages());
+         Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
+         Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages());
       }
 
       stopServers(0, 1, 2);