You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/17 19:09:23 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1042 - support amqp
failover list
ARTEMIS-1042 - support amqp failover list
https://issues.apache.org/jira/browse/ARTEMIS-1042
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6f73b0c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6f73b0c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6f73b0c
Branch: refs/heads/master
Commit: c6f73b0c0a3156d4c83c481f82f1c12b015a653d
Parents: 0d59d2c
Author: Andy Taylor <an...@gmail.com>
Authored: Tue Feb 28 10:05:39 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 17 15:08:58 2017 -0400
----------------------------------------------------------------------
.../core/client/impl/TopologyMemberImpl.java | 18 +++
.../amqp/broker/AMQPConnectionCallback.java | 13 ++
.../amqp/proton/AMQPConnectionContext.java | 24 ++++
.../protocol/amqp/proton/AmqpSupport.java | 6 +
.../integration/amqp/AmqpNettyFailoverTest.java | 140 +++++++++++++++++++
5 files changed, 201 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
index cf62e17..c82a939 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.client.impl;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair;
@@ -129,6 +131,22 @@ public final class TopologyMemberImpl implements TopologyMember {
return "tcp://" + host + ":" + port;
}
+ public URI toBackupURI() {
+ TransportConfiguration backupConnector = getBackup();
+ if (backupConnector == null) {
+ return null;
+ }
+ Map<String, Object> props = backupConnector.getParams();
+ String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", props);
+ int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 0, props);
+ boolean sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, false, props);
+ try {
+ return new URI("tcp://" + host + ":" + port + "?" + TransportConstants.SSL_ENABLED_PROP_NAME + "=" + sslEnabled);
+ } catch (URISyntaxException e) {
+ return null;
+ }
+ }
+
@Override
public String toString() {
return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 850671a..7e7dc60 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
+import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,10 +31,13 @@ import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
+import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -249,4 +253,13 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
return new XidImpl("amqp".getBytes(), 1, bytes);
}
+ public URI getFailoverList() {
+ ClusterManager clusterManager = server.getClusterManager();
+ ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null);
+ if (clusterConnection != null) {
+ TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString());
+ return member.toBackupURI();
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index bac3e7e..d6cab99 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
+import java.net.URI;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -24,6 +26,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@@ -47,6 +50,12 @@ import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
+import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
+
public class AMQPConnectionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@@ -206,6 +215,21 @@ public class AMQPConnectionContext extends ProtonInitializable {
}
public Symbol[] getConnectionCapabilitiesOffered() {
+ URI tc = connectionCallback.getFailoverList();
+ if (tc != null) {
+ Map<Symbol,Object> hostDetails = new HashMap<>();
+ hostDetails.put(NETWORK_HOST, tc.getHost());
+ boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true");
+ if (isSSL) {
+ hostDetails.put(SCHEME, "amqps");
+ } else {
+ hostDetails.put(SCHEME, "amqp");
+ }
+ hostDetails.put(HOSTNAME, tc.getHost());
+ hostDetails.put(PORT, tc.getPort());
+
+ connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails));
+ }
return ExtCapability.getCapabilities();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index fa77ad5..227ee5d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -56,6 +56,12 @@ public class AmqpSupport {
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
+ static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+ static final Symbol PORT = Symbol.valueOf("port");
+ static final Symbol SCHEME = Symbol.valueOf("scheme");
+ static final Symbol HOSTNAME = Symbol.valueOf("hostname");
+
+ static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
// Symbols used in configuration of newly opened links.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6f73b0c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
new file mode 100644
index 0000000..f264269
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class AmqpNettyFailoverTest extends FailoverTest {
+
+
+ // this will ensure that all tests in this class are run twice,
+ // once with "true" passed to the class' constructor and once with "false"
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection getParameters() {
+
+ // these 3 are for comparison
+ return Arrays.asList(new Object[][]{{"NON_SSL", 0}, {"SSL", 1}});
+ }
+
+
+ private final int protocol;
+
+ public AmqpNettyFailoverTest(String name, int protocol) {
+ this.protocol = protocol;
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+ return getNettyAcceptorTransportConfig(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+ return getNettyConnectorTransportConfig(live);
+ }
+
+
+ @Test
+ public void testFailoverListWithAMQP() throws Exception {
+ JmsConnectionFactory factory = getJmsConnectionFactory();
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue queue = session.createQueue(ADDRESS.toString());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("hello before failover"));
+ liveServer.crash(true, true);
+ producer.send(session.createTextMessage("hello after failover"));
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ TextMessage receive = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(receive);
+ Assert.assertEquals("hello before failover", receive.getText());
+ receive = (TextMessage) consumer.receive(5000);
+ Assert.assertEquals("hello after failover", receive.getText());
+ Assert.assertNotNull(receive);
+ }
+ }
+
+ private JmsConnectionFactory getJmsConnectionFactory() {
+ if (protocol == 0) {
+ return new JmsConnectionFactory("failover:(amqp://localhost:61616)");
+ } else {
+ String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile();
+ String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile();
+ // return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false");
+ return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)");
+ }
+ }
+
+ private TransportConfiguration getNettyAcceptorTransportConfig(final boolean live) {
+ Map<String, Object> server1Params = new HashMap<>();
+ if (protocol == 1) {
+ server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+
+ server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "server-side-keystore.jks");
+ server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
+ server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks");
+ server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
+ //server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
+
+ }
+ if (live) {
+ return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+ }
+
+
+ server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+
+ return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
+ }
+
+ private TransportConfiguration getNettyConnectorTransportConfig(final boolean live) {
+ Map<String, Object> server1Params = new HashMap<>();
+ if (protocol == 1) {
+ server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true");
+ server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
+ server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "client-side-truststore.jks");
+ server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
+ server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks");
+ server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
+ }
+ if (live) {
+ return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+ }
+ server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
+ }
+
+}