You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/30 19:10:52 UTC
[2/2] activemq-artemis git commit: ARTEMIS-2064 make address & queue
deployment more robust
ARTEMIS-2064 make address & queue deployment more robust
Any failure to deploy an address or queue will short-circuit the broker
initialization process preventing any other addresses or queues from
being deployed as well as other critical resources like acceptors, etc.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b0d30d4d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b0d30d4d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b0d30d4d
Branch: refs/heads/master
Commit: b0d30d4da5708e2f46f9cb747e0b380d05f94526
Parents: 611cedf
Author: Justin Bertram <jb...@apache.org>
Authored: Wed Aug 29 21:03:00 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 30 15:10:44 2018 -0400
----------------------------------------------------------------------
.../core/config/CoreAddressConfiguration.java | 10 ++++
.../core/config/CoreQueueConfiguration.java | 24 ++++++++
.../config/impl/LegacyJMSConfiguration.java | 30 +++++-----
.../deployers/impl/FileConfigurationParser.java | 3 +-
.../core/server/ActiveMQServerLogger.java | 10 ++++
.../core/server/impl/ActiveMQServerImpl.java | 58 +++++++++++---------
.../byteman/AddressDeploymentFailedTest.java | 45 +++++++++++++++
.../byteman/QueueDeploymentFailedTest.java | 47 ++++++++++++++++
8 files changed, 182 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java
index 290d483..069222a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java
@@ -65,4 +65,14 @@ public class CoreAddressConfiguration implements Serializable {
public List<CoreQueueConfiguration> getQueueConfigurations() {
return queueConfigurations;
}
+
+ @Override
+ public String toString() {
+ return "CoreAddressConfiguration[" +
+ "name=" + name +
+ ", routingTypes=" + routingTypes +
+ ", queueConfigurations=" + queueConfigurations +
+ "]";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index 2ccae2d..87e938e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -201,6 +201,7 @@ public class CoreQueueConfiguration implements Serializable {
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
+ result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
return result;
}
@@ -265,6 +266,29 @@ public class CoreQueueConfiguration implements Serializable {
} else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch)) {
return false;
}
+ if (routingType == null) {
+ if (other.routingType != null)
+ return false;
+ } else if (!routingType.equals(other.routingType)) {
+ return false;
+ }
return true;
}
+
+ @Override
+ public String toString() {
+ return "CoreQueueConfiguration[" +
+ "name=" + name +
+ ", address=" + address +
+ ", routingType=" + routingType +
+ ", durable=" + durable +
+ ", filterString=" + filterString +
+ ", maxConsumers=" + maxConsumers +
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers +
+ ", exclusive=" + exclusive +
+ ", lastValue=" + lastValue +
+ ", consumersBeforeDispatch=" + consumersBeforeDispatch +
+ ", delayBeforeDispatch=" + delayBeforeDispatch +
+ "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java
index a79402c..dc50917 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java
@@ -21,7 +21,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.URL;
-import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -150,10 +149,9 @@ public class LegacyJMSConfiguration implements Deployable {
*/
public void parseTopicConfiguration(final Node node) throws Exception {
String topicName = node.getAttributes().getNamedItem(NAME_ATTR).getNodeValue();
- List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations();
- coreAddressConfigurations.add(new CoreAddressConfiguration()
- .setName(topicName)
- .addRoutingType(RoutingType.MULTICAST));
+ configuration.addAddressConfiguration(new CoreAddressConfiguration()
+ .setName(topicName)
+ .addRoutingType(RoutingType.MULTICAST));
}
/**
@@ -173,22 +171,22 @@ public class LegacyJMSConfiguration implements Deployable {
for (int i = 0; i < children.getLength(); i++) {
Node child = children.item(i);
- if (QUEUE_SELECTOR_NODE_NAME.equals(children.item(i).getNodeName())) {
- Node selectorNode = children.item(i);
+ if (QUEUE_SELECTOR_NODE_NAME.equals(child.getNodeName())) {
+ Node selectorNode = child;
Node attNode = selectorNode.getAttributes().getNamedItem("string");
selectorString = attNode.getNodeValue();
}
}
- List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations();
- coreAddressConfigurations.add(new CoreAddressConfiguration()
- .setName(queueName)
- .addRoutingType(RoutingType.ANYCAST)
- .addQueueConfiguration(new CoreQueueConfiguration()
- .setAddress(queueName)
- .setName(queueName)
- .setFilterString(selectorString)
- .setRoutingType(RoutingType.ANYCAST)));
+ configuration.addAddressConfiguration(new CoreAddressConfiguration()
+ .setName(queueName)
+ .addRoutingType(RoutingType.ANYCAST)
+ .addQueueConfiguration(new CoreQueueConfiguration()
+ .setAddress(queueName)
+ .setName(queueName)
+ .setFilterString(selectorString)
+ .setDurable(durable)
+ .setRoutingType(RoutingType.ANYCAST)));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index dc5ba16..b1b2c0e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -770,8 +770,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Element node = (Element) elements.item(0);
NodeList list = node.getElementsByTagName("address");
for (int i = 0; i < list.getLength(); i++) {
- CoreAddressConfiguration addrConfig = parseAddressConfiguration(list.item(i));
- config.getAddressConfigurations().add(addrConfig);
+ config.addAddressConfiguration(parseAddressConfiguration(list.item(i)));
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 6aa53f6..e6b7b48 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1593,6 +1593,16 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT)
void bridgeAddressFull(String addressName, String bridgeName);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222274, value = "Failed to deploy address {0}: {1}",
+ format = Message.Format.MESSAGE_FORMAT)
+ void problemDeployingAddress(String addressName, String message);
+
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222275, value = "Failed to deploy queue {0}: {1}",
+ format = Message.Format.MESSAGE_FORMAT)
+ void problemDeployingQueue(String queueName, String message);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 62d3c6e..6cb0515 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2742,39 +2742,43 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private void deployAddressesFromConfiguration(Configuration configuration) throws Exception {
for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) {
- AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
- addOrUpdateAddressInfo(info);
- ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
- deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
+ try {
+ ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString());
+ AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes());
+ addOrUpdateAddressInfo(info);
+ deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations());
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage());
+ }
}
}
private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
for (CoreQueueConfiguration config : queues) {
- SimpleString queueName = SimpleString.toSimpleString(config.getName());
- ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString());
- AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
- // determine if there is an address::queue match; update it if so
- int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers();
- boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive();
- boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue();
- int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch();
- long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
-
- if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
- updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(),
- isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
- } else {
- // if the address::queue doesn't exist then create it
- try {
- createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
- queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
- config.isDurable(),false,false,false,false, maxConsumers, config.getPurgeOnNoConsumers(),
- isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
- } catch (ActiveMQQueueExistsException e) {
- // the queue may exist on a *different* address
- ActiveMQServerLogger.LOGGER.warn(e.getMessage());
+ try {
+ SimpleString queueName = SimpleString.toSimpleString(config.getName());
+ ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString());
+ AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
+ // determine if there is an address::queue match; update it if so
+ int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers();
+ boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive();
+ boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue();
+ int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch();
+ long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
+
+ if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
+ updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
+ } else {
+ // if the address::queue doesn't exist then create it
+ try {
+ createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
+ } catch (ActiveMQQueueExistsException e) {
+ // the queue may exist on a *different* address
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage());
+ }
}
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.problemDeployingQueue(config.getName(), e.getMessage());
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java
new file mode 100644
index 0000000..0d19158
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class AddressDeploymentFailedTest extends ActiveMQTestBase {
+
+ @Test
+ @BMRule(name = "blow up address deployment",
+ targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl",
+ targetMethod = "addOrUpdateAddressInfo(AddressInfo)",
+ targetLocation = "EXIT",
+ action = "throw new IllegalStateException(\"test exception\")")
+ public void testAddressDeploymentFailure() throws Exception {
+ ActiveMQServer server = createServer(false, createDefaultNettyConfig());
+ server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(UUID.randomUUID().toString()).addRoutingType(RoutingType.ANYCAST));
+ server.start();
+ assertTrue(server.getRemotingService().isStarted());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java
new file mode 100644
index 0000000..d01215a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.byteman;
+
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class QueueDeploymentFailedTest extends ActiveMQTestBase {
+
+ @Test
+ @BMRule(name = "blow up queue deployment",
+ targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl",
+ targetMethod = "createQueue(SimpleString,RoutingType,SimpleString,SimpleString,SimpleString,boolean,boolean,boolean,boolean,boolean,int,boolean,boolean,boolean,int,long,boolean",
+ targetLocation = "EXIT",
+ action = "throw new IllegalStateException(\"test exception\")")
+ public void testQueueDeploymentFailure() throws Exception {
+ ActiveMQServer server = createServer(false, createDefaultNettyConfig());
+ String address = UUID.randomUUID().toString();
+ server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(address).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new CoreQueueConfiguration().setName(UUID.randomUUID().toString()).setRoutingType(RoutingType.ANYCAST).setAddress(address)));
+ server.start();
+ assertTrue(server.getRemotingService().isStarted());
+ }
+}