You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/17 19:09:23 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1042 - support amqp failover list

ARTEMIS-1042 - support amqp failover list

https://issues.apache.org/jira/browse/ARTEMIS-1042


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6f73b0c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6f73b0c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6f73b0c

Branch: refs/heads/master
Commit: c6f73b0c0a3156d4c83c481f82f1c12b015a653d
Parents: 0d59d2c
Author: Andy Taylor <an...@gmail.com>
Authored: Tue Feb 28 10:05:39 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 17 15:08:58 2017 -0400

----------------------------------------------------------------------
 .../core/client/impl/TopologyMemberImpl.java    |  18 +++
 .../amqp/broker/AMQPConnectionCallback.java     |  13 ++
 .../amqp/proton/AMQPConnectionContext.java      |  24 ++++
 .../protocol/amqp/proton/AmqpSupport.java       |   6 +
 .../integration/amqp/AmqpNettyFailoverTest.java | 140 +++++++++++++++++++
 5 files changed, 201 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
index cf62e17..c82a939 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.Pair;
@@ -129,6 +131,22 @@ public final class TopologyMemberImpl implements TopologyMember {
       return "tcp://" + host + ":" + port;
    }
 
+   public URI toBackupURI() {
+      TransportConfiguration backupConnector = getBackup();
+      if (backupConnector == null) {
+         return null;
+      }
+      Map<String, Object> props = backupConnector.getParams();
+      String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", props);
+      int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 0, props);
+      boolean sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, false, props);
+      try {
+         return new URI("tcp://" + host + ":" + port + "?" + TransportConstants.SSL_ENABLED_PROP_NAME + "=" + sslEnabled);
+      } catch (URISyntaxException e) {
+         return null;
+      }
+   }
+
    @Override
    public String toString() {
       return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 850671a..7e7dc60 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
+import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,10 +31,13 @@ import io.netty.channel.ChannelFutureListener;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -249,4 +253,13 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
       return new XidImpl("amqp".getBytes(), 1, bytes);
    }
 
+   public URI getFailoverList() {
+      ClusterManager clusterManager = server.getClusterManager();
+      ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null);
+      if (clusterConnection != null) {
+         TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString());
+         return member.toBackupURI();
+      }
+      return null;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index bac3e7e..d6cab99 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import java.net.URI;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -24,6 +26,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -47,6 +50,12 @@ import org.jboss.logging.Logger;
 
 import io.netty.buffer.ByteBuf;
 
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
+
 public class AMQPConnectionContext extends ProtonInitializable {
 
    private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@@ -206,6 +215,21 @@ public class AMQPConnectionContext extends ProtonInitializable {
    }
 
    public Symbol[] getConnectionCapabilitiesOffered() {
+      URI tc = connectionCallback.getFailoverList();
+      if (tc != null) {
+         Map<Symbol,Object> hostDetails = new HashMap<>();
+         hostDetails.put(NETWORK_HOST, tc.getHost());
+         boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true");
+         if (isSSL) {
+            hostDetails.put(SCHEME, "amqps");
+         } else {
+            hostDetails.put(SCHEME, "amqp");
+         }
+         hostDetails.put(HOSTNAME, tc.getHost());
+         hostDetails.put(PORT, tc.getPort());
+
+         connectionProperties.put(FAILOVER_SERVER_LIST,  Arrays.asList(hostDetails));
+      }
       return ExtCapability.getCapabilities();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index fa77ad5..227ee5d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -56,6 +56,12 @@ public class AmqpSupport {
    public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
    public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
    public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
+   static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+   static final Symbol PORT = Symbol.valueOf("port");
+   static final Symbol SCHEME = Symbol.valueOf("scheme");
+   static final Symbol HOSTNAME = Symbol.valueOf("hostname");
+
+   static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
 
 
    // Symbols used in configuration of newly opened links.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
new file mode 100644
index 0000000..f264269
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class AmqpNettyFailoverTest extends FailoverTest {
+
+
+   // this will ensure that all tests in this class are run twice,
+   // once with "true" passed to the class' constructor and once with "false"
+   @Parameterized.Parameters(name = "{0}")
+   public static Collection getParameters() {
+
+      // these 3 are for comparison
+      return Arrays.asList(new Object[][]{{"NON_SSL", 0}, {"SSL", 1}});
+   }
+
+
+   private final int protocol;
+
+   public AmqpNettyFailoverTest(String name, int protocol) {
+      this.protocol = protocol;
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+      return getNettyAcceptorTransportConfig(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+      return getNettyConnectorTransportConfig(live);
+   }
+
+
+   @Test
+   public void testFailoverListWithAMQP() throws Exception {
+      JmsConnectionFactory factory = getJmsConnectionFactory();
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue queue = session.createQueue(ADDRESS.toString());
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("hello before failover"));
+         liveServer.crash(true, true);
+         producer.send(session.createTextMessage("hello after failover"));
+         MessageConsumer consumer = session.createConsumer(queue);
+         connection.start();
+         TextMessage receive = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(receive);
+         Assert.assertEquals("hello before failover", receive.getText());
+         receive = (TextMessage) consumer.receive(5000);
+         Assert.assertEquals("hello after failover", receive.getText());
+         Assert.assertNotNull(receive);
+      }
+   }
+
+   private JmsConnectionFactory getJmsConnectionFactory() {
+      if (protocol == 0) {
+         return new JmsConnectionFactory("failover:(amqp://localhost:61616)");
+      } else {
+         String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile();
+         String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile();
+         // return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false");
+         return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)");
+      }
+   }
+
+   private TransportConfiguration getNettyAcceptorTransportConfig(final boolean live) {
+      Map<String, Object> server1Params = new HashMap<>();
+      if (protocol == 1) {
+         server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+
+         server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "server-side-keystore.jks");
+         server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
+         server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks");
+         server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
+         //server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
+
+      }
+      if (live) {
+         return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+      }
+
+
+      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+
+      return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+   }
+
+   private TransportConfiguration getNettyConnectorTransportConfig(final boolean live) {
+      Map<String, Object> server1Params = new HashMap<>();
+      if (protocol == 1) {
+         server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+         server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
+         server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "client-side-truststore.jks");
+         server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
+         server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks");
+         server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
+      }
+      if (live) {
+         return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+      }
+      server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+      return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+   }
+
+}