You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/15 14:51:59 UTC
[1/4] qpid-broker-j git commit: QPID-7606: Remodel alternateExchange
as alternateBinding
Repository: qpid-broker-j
Updated Branches:
refs/heads/master c2c2bc470 -> e4598dcd6
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
index 2512332..2e471c6 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.queue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
@@ -38,6 +39,9 @@ import javax.jms.Topic;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
@@ -47,6 +51,8 @@ import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
{
+ private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
+ private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
private Connection _connection;
private Session _session;
@@ -122,7 +128,73 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
@Override
public Map<String, Object> getAttributes()
{
- return Collections.<String, Object>singletonMap(Exchange.TYPE, "fanout");
+ return Collections.singletonMap(Exchange.TYPE, "fanout");
+ }
+ },
+
+ new NodeAutoCreationPolicy()
+ {
+ @Override
+ public String getPattern()
+ {
+ return ".*" + DEAD_LETTER_QUEUE_SUFFIX;
+ }
+
+ @Override
+ public boolean isCreatedOnPublish()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isCreatedOnConsume()
+ {
+ return true;
+ }
+
+ @Override
+ public String getNodeType()
+ {
+ return "Queue";
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return Collections.emptyMap();
+ }
+ },
+
+ new NodeAutoCreationPolicy()
+ {
+ @Override
+ public String getPattern()
+ {
+ return ".*" + DEAD_LETTER_EXCHANGE_SUFFIX;
+ }
+
+ @Override
+ public boolean isCreatedOnPublish()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isCreatedOnConsume()
+ {
+ return false;
+ }
+
+ @Override
+ public String getNodeType()
+ {
+ return "Exchange";
+ }
+
+ @Override
+ public Map<String, Object> getAttributes()
+ {
+ return Collections.singletonMap(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
}
}
};
@@ -266,4 +338,102 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
// pass
}
}
+
+ public void testQueueAlternateBindingCreation() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ String queueName = getTestQueueName();
+ String deadLetterQueueName = queueName + DEAD_LETTER_QUEUE_SUFFIX;
+
+ final Map<String, Object> attributes = new HashMap<>();
+ Map<String, Object> expectedAlternateBinding =
+ Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterQueueName);
+ attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING,
+ new ObjectMapper().writeValueAsString(expectedAlternateBinding));
+ createEntityUsingAmqpManagement(queueName,
+ session,
+ "org.apache.qpid.Queue", attributes);
+
+ Map<String, Object> queueAttributes =
+ managementReadObject(session, "org.apache.qpid.Queue", queueName, true);
+
+ Object actualAlternateBinding = queueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING);
+ Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding);
+ assertEquals("Unexpected alternate binding",
+ new HashMap<>(expectedAlternateBinding),
+ new HashMap<>(actualAlternateBindingMap));
+
+ assertNotNull("Cannot get dead letter queue",
+ managementReadObject(session, "org.apache.qpid.Queue", deadLetterQueueName, true));
+ }
+
+ public void testExchangeAlternateBindingCreation() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ String exchangeName = getTestQueueName();
+ String deadLetterExchangeName = exchangeName + DEAD_LETTER_EXCHANGE_SUFFIX;
+
+ final Map<String, Object> attributes = new HashMap<>();
+ Map<String, Object> expectedAlternateBinding =
+ Collections.singletonMap(AlternateBinding.DESTINATION, deadLetterExchangeName);
+ attributes.put(Exchange.ALTERNATE_BINDING, new ObjectMapper().writeValueAsString(expectedAlternateBinding));
+ attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ createEntityUsingAmqpManagement(exchangeName,
+ session,
+ "org.apache.qpid.DirectExchange", attributes);
+
+ Map<String, Object> exchangeAttributes =
+ managementReadObject(session, "org.apache.qpid.Exchange", exchangeName, true);
+
+ Object actualAlternateBinding = exchangeAttributes.get(Exchange.ALTERNATE_BINDING);
+ Map<String, Object> actualAlternateBindingMap = convertIfNecessary(actualAlternateBinding);
+ assertEquals("Unexpected alternate binding",
+ new HashMap<>(expectedAlternateBinding),
+ new HashMap<>(actualAlternateBindingMap));
+
+ assertNotNull("Cannot get dead letter exchange",
+ managementReadObject(session, "org.apache.qpid.FanoutExchange", deadLetterExchangeName, true));
+ }
+
+ public void testLegacyQueueDeclareArgumentAlternateBindingCreation() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ final Map<String, Object> arguments = Collections.singletonMap(QueueArgumentsConverter.X_QPID_DLQ_ENABLED, true);
+ String testQueueName = getTestQueueName();
+ ((AMQSession<?,?>) session).createQueue(testQueueName, false, true, false, arguments);
+
+
+ Map<String, Object> queueAttributes =
+ managementReadObject(session, "org.apache.qpid.Queue", testQueueName, true);
+
+ Object actualAlternateBinding = queueAttributes.get(Exchange.ALTERNATE_BINDING);
+ assertTrue("Unexpected alternate binding", actualAlternateBinding instanceof Map);
+ Object deadLetterQueueName = ((Map<String, Object>) actualAlternateBinding).get(AlternateBinding.DESTINATION);
+
+ assertNotNull("Cannot get dead letter queue",
+ managementReadObject(session, "org.apache.qpid.Queue", String.valueOf(deadLetterQueueName), true));
+ }
+
+ private Map<String, Object> convertIfNecessary(final Object actualAlternateBinding) throws IOException
+ {
+ Map<String, Object> actualAlternateBindingMap;
+ if (actualAlternateBinding instanceof String)
+ {
+ actualAlternateBindingMap = new ObjectMapper().readValue((String)actualAlternateBinding, Map.class);
+ }
+ else
+ {
+ actualAlternateBindingMap = (Map<String, Object>) actualAlternateBinding;
+ }
+ return actualAlternateBindingMap;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java b/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java
new file mode 100644
index 0000000..86ab13f
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/server/routing/AlternateBindingRoutingTest.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.routing;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.qpid.server.model.AlternateBinding;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class AlternateBindingRoutingTest extends QpidBrokerTestCase
+{
+ public void testFanoutExchangeAsAlternateBinding() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ String queueName = getTestQueueName();
+ String deadLetterQueueName = queueName + "_DeadLetter";
+ String deadLetterExchangeName = "deadLetterExchange";
+
+ Queue deadLetterQueue = createTestQueue(session, deadLetterQueueName);
+
+ createEntityUsingAmqpManagement(deadLetterExchangeName,
+ session,
+ "org.apache.qpid.FanoutExchange");
+
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put("destination", deadLetterQueueName);
+ arguments.put("bindingKey", queueName);
+ performOperationUsingAmqpManagement(deadLetterExchangeName,
+ "bind",
+ session,
+ "org.apache.qpid.Exchange",
+ arguments);
+
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
+ attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING,
+ new ObjectMapper().writeValueAsString(Collections.singletonMap(AlternateBinding.DESTINATION,
+ deadLetterExchangeName)));
+ createEntityUsingAmqpManagement(queueName,
+ session,
+ "org.apache.qpid.StandardQueue",
+ attributes);
+ Queue testQueue = getQueueFromName(session, queueName);
+
+ sendMessage(session, testQueue, 1);
+ assertEquals("Unexpected number of messages on queueName", 1, getQueueDepth(connection, testQueue));
+
+ assertEquals("Unexpected number of messages on DLQ queueName", 0, getQueueDepth(connection, deadLetterQueue));
+
+ performOperationUsingAmqpManagement(queueName,
+ "DELETE",
+ session,
+ "org.apache.qpid.Queue",
+ Collections.emptyMap());
+
+ assertEquals("Unexpected number of messages on DLQ queueName", 1, getQueueDepth(connection, deadLetterQueue));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java b/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
index 5d1263f..d7d94e4 100644
--- a/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
@@ -731,7 +731,7 @@ public class VirtualHostMessageStoreTest extends QpidTestCase
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
durable ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING, null);
exchange = _virtualHost.createChild(Exchange.class, attributes);
return exchange;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
index 6b5099f..8b39eda 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/ExchangeRestTest.java
@@ -21,10 +21,12 @@
package org.apache.qpid.systest.rest;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
public class ExchangeRestTest extends QpidRestTestCase
@@ -74,24 +76,24 @@ public class ExchangeRestTest extends QpidRestTestCase
String exchangeName = getTestName();
String exchangeUrl = "exchange/test/test/" + exchangeName;
- Map<String, Object> attributes = new HashMap<String, Object>();
+ Map<String, Object> attributes = new HashMap<>();
attributes.put(Exchange.NAME, exchangeName);
attributes.put(Exchange.TYPE, "direct");
- int responseCode = getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes);
- assertEquals("Exchange should be created", 201, responseCode);
+ getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes, 201);
Map<String, Object> exchange = getRestTestHelper().getJsonAsSingletonList(exchangeUrl);
assertNotNull("Exchange not found", exchange);
- attributes = new HashMap<String, Object>();
+ attributes = new HashMap<>();
attributes.put(Exchange.NAME, exchangeName);
- attributes.put(Exchange.ALTERNATE_EXCHANGE, "amq.direct");
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, "amq.direct"));
- responseCode = getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes);
- assertEquals("Exchange update should be supported", 200, responseCode);
+ getRestTestHelper().submitRequest(exchangeUrl, "PUT", attributes, 200);
exchange = getRestTestHelper().getJsonAsSingletonList(exchangeUrl);
assertNotNull("Exchange not found", exchange);
- assertEquals("amq.direct",exchange.get(Exchange.ALTERNATE_EXCHANGE));
+ assertEquals(new HashMap<>(Collections.singletonMap(AlternateBinding.DESTINATION, "amq.direct")),
+ new HashMap<>(((Map<String, Object>) exchange.get(Exchange.ALTERNATE_BINDING))));
}
private void assertExchange(String exchangeName, Map<String, Object> exchange)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
index 712eafc..52fa38c 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
@@ -43,7 +43,6 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.LastValueQueue;
import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.SortedQueue;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.derby.DerbyVirtualHostImpl;
@@ -679,46 +678,6 @@ public class VirtualHostRestTest extends QpidRestTestCase
assertEquals("Unexpected priorities key attribute", 10, priorityQueue.get(PriorityQueue.PRIORITIES));
}
- @SuppressWarnings("unchecked")
- public void testCreateQueueWithDLQEnabled() throws Exception
- {
- String queueName = getTestQueueName();
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
-
- //verify the starting state
- Map<String, Object> hostDetails = getRestTestHelper().getJsonAsSingletonList("virtualhost/test");
- List<Map<String, Object>> queues = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_QUEUES_ATTRIBUTE);
- List<Map<String, Object>> exchanges = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_EXCHANGES_ATTRIBUTE);
-
- assertNull("queue "+ queueName + " should not have already been present", getRestTestHelper().find(Queue.NAME, queueName , queues));
- assertNull("queue "+ queueName + "_DLQ should not have already been present", getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues));
- assertNull("exchange should not have already been present", getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges));
-
- //create the queue
- createQueue(queueName, "standard", attributes);
-
- //verify the new queue, as well as the DLQueue and DLExchange have been created
- hostDetails = getRestTestHelper().getJsonAsSingletonList("virtualhost/test");
- queues = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_QUEUES_ATTRIBUTE);
- exchanges = (List<Map<String, Object>>) hostDetails.get(VirtualHostRestTest.VIRTUALHOST_EXCHANGES_ATTRIBUTE);
-
- Map<String, Object> queue = getRestTestHelper().find(Queue.NAME, queueName , queues);
- Map<String, Object> dlqQueue = getRestTestHelper().find(Queue.NAME, queueName + "_DLQ" , queues);
- Map<String, Object> dlExchange = getRestTestHelper().find(Exchange.NAME, queueName + "_DLE" , exchanges);
- assertNotNull("queue should have been present", queue);
- assertNotNull("queue should have been present", dlqQueue);
- assertNotNull("exchange should have been present", dlExchange);
-
- //verify that the alternate exchange is set as expected on the new queue
- Map<String, Object> queueAttributes = new HashMap<String, Object>();
- queueAttributes.put(Queue.ALTERNATE_EXCHANGE, queueName + "_DLE");
-
- Asserts.assertQueue(queueName, "standard", queue, queueAttributes);
- Asserts.assertQueue(queueName, "standard", queue, null);
- }
-
public void testObjectsWithSlashes() throws Exception
{
String queueName = "testQueue/with/slashes";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java b/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java
index 15461d7..dc35c90 100644
--- a/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/rest/acl/ExchangeRestACLTest.java
@@ -35,6 +35,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.qpid.server.management.plugin.servlet.rest.AbstractServlet;
import org.apache.qpid.server.model.AccessControlProvider;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.systest.rest.QpidRestTestCase;
@@ -197,33 +198,33 @@ public class ExchangeRestACLTest extends QpidRestTestCase
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
- int responseCode = createExchange();
+ createExchange();
assertExchangeExists();
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.NAME, _exchangeName);
- attributes.put(Exchange.ALTERNATE_EXCHANGE, "my-alternate-exchange");
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, "my-alternate-exchange"));
- responseCode = getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes);
- assertEquals("Exchange 'my-alternate-exchange' does not exist", AbstractServlet.SC_UNPROCESSABLE_ENTITY, responseCode);
+ getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes, AbstractServlet.SC_UNPROCESSABLE_ENTITY);
}
public void testSetExchangeAttributesDenied() throws Exception
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
- int responseCode = createExchange();
+ createExchange();
assertExchangeExists();
getRestTestHelper().setUsernameAndPassword(DENIED_USER, DENIED_USER);
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.NAME, _exchangeName);
- attributes.put(Exchange.ALTERNATE_EXCHANGE, "my-alternate-exchange");
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, "my-alternate-exchange"));
- responseCode = getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes);
- assertEquals("Setting of exchange attribites should be allowed", 403, responseCode);
+ getRestTestHelper().submitRequest(_exchangeUrl, "PUT", attributes, 403);
}
public void testBindToExchangeAllowed() throws Exception
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index ae6f7ce..c4271fc 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -22,6 +22,7 @@ package org.apache.qpid.test.unit.client;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -29,7 +30,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
-import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -38,14 +38,14 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.Topic;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -71,12 +71,14 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
private static final int MAX_DELIVERY_COUNT = 2;
private CountDownLatch _awaitCompletion;
- protected long _awaitEmptyQueue;
- protected long _awaitCompletionTimeout = 20;
+ private long _awaitEmptyQueue;
+ private long _awaitCompletionTimeout = 20;
/** index numbers of messages to be redelivered */
private final List<Integer> _redeliverMsgs = Arrays.asList(1, 2, 5, 14);
private String _testQueueName;
+ private Queue _testDeadLetterQueue;
+ private Queue _testQueue;
@Override
public void setUp() throws Exception
@@ -84,9 +86,6 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
_awaitEmptyQueue = Long.parseLong(System.getProperty("MaxDeliveryCountTest.awaitEmptyQueue", "2500"));
_awaitCompletionTimeout = Long.parseLong(System.getProperty("MaxDeliveryCountTest.awaitCompletionTimeout", "20000"));
- setTestSystemProperty("queue.deadLetterQueueEnabled","true");
- setTestSystemProperty("queue.maximumDeliveryAttempts", String.valueOf(MAX_DELIVERY_COUNT));
-
// Set client-side flag to allow the server to determine if messages
// dead-lettered or requeued.
if (!isBroker010())
@@ -95,31 +94,28 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
}
super.setUp();
_testQueueName = getTestQueueName();
- boolean durableSub = isDurSubTest();
+ String testDeadLetterQueueName = _testQueueName + "_DLQ";
Connection connection = getConnectionBuilder().setClientId("clientid").build();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Destination destination;
- if(durableSub)
- {
- destination = createTopic(connection, _testQueueName);
- session.createDurableSubscriber((Topic)destination, getName()).close();
- }
- else
- {
- final Map<String, Object> attributes = new HashMap<>();
- attributes.put(org.apache.qpid.server.model.Queue.NAME, _testQueueName);
- attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
- createEntityUsingAmqpManagement(_testQueueName,
- session,
- "org.apache.qpid.StandardQueue",
- attributes);
- destination = getQueueFromName(session, _testQueueName);
- }
- MessageProducer producer = session.createProducer(destination);
+ _testDeadLetterQueue = createTestQueue(session, testDeadLetterQueueName);
+
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.NAME, _testQueueName);
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, MAX_DELIVERY_COUNT);
+ attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING,
+ new ObjectMapper().writeValueAsString(Collections.singletonMap(AlternateBinding.DESTINATION,
+ testDeadLetterQueueName)));
+ createEntityUsingAmqpManagement(_testQueueName,
+ session,
+ "org.apache.qpid.StandardQueue",
+ attributes);
+ _testQueue = getQueueFromName(session, _testQueueName);
+
+
+ MessageProducer producer = session.createProducer(_testQueue);
for (int count = 1; count <= MSG_COUNT; count++)
{
Message msg = session.createTextMessage(generateContent(count));
@@ -139,96 +135,57 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
return "Message " + count + " content.";
}
- /**
- * Test that Max Redelivery is enforced when using onMessage() on a
- * Client-Ack session.
- */
public void testAsynchronousClientAckSession() throws Exception
{
- doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false, false);
+ doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, false);
}
- /**
- * Test that Max Redelivery is enforced when using onMessage() on a
- * transacted session.
- */
public void testAsynchronousTransactedSession() throws Exception
{
- doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, false);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false);
}
- /**
- * Test that Max Redelivery is enforced when using onMessage() on an
- * Auto-Ack session.
- */
public void testAsynchronousAutoAckSession() throws Exception
{
- doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false, false);
+ doTest(Session.AUTO_ACKNOWLEDGE, _redeliverMsgs, false);
}
- /**
- * Test that Max Redelivery is enforced when using onMessage() on a
- * Dups-OK session.
- */
public void testAsynchronousDupsOkSession() throws Exception
{
- doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false, false);
+ doTest(Session.DUPS_OK_ACKNOWLEDGE, _redeliverMsgs, false);
}
- /**
- * Test that Max Redelivery is enforced when using recieve() on a
- * Client-Ack session.
- */
public void testSynchronousClientAckSession() throws Exception
{
- doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true, false);
+ doTest(Session.CLIENT_ACKNOWLEDGE, _redeliverMsgs, true);
}
- /**
- * Test that Max Redelivery is enforced when using recieve() on a
- * transacted session.
- */
public void testSynchronousTransactedSession() throws Exception
{
- doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false);
- }
-
- public void testDurableSubscription() throws Exception
- {
- doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, false, true);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true);
}
public void testWhenBrokerIsRestartedAfterEnqeuingMessages() throws Exception
{
restartDefaultBroker();
- doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true, false);
+ doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true);
}
- private void doTest(final int deliveryMode, final List<Integer> redeliverMsgs, final boolean synchronous, final boolean durableSub) throws Exception
+ private void doTest(final int deliveryMode,
+ final List<Integer> redeliverMsgs,
+ final boolean synchronous) throws Exception
{
final Connection clientConnection = getConnectionBuilder().setClientId("clientid").build();
final boolean transacted = deliveryMode == Session.SESSION_TRANSACTED;
final Session clientSession = clientConnection.createSession(transacted, deliveryMode);
- MessageConsumer consumer;
- Destination dest = durableSub ? clientSession.createTopic(_testQueueName) : clientSession.createQueue(_testQueueName);
- Queue checkQueue;
- if(durableSub)
- {
- consumer = clientSession.createDurableSubscriber((Topic)dest, getName());
+ MessageConsumer consumer = clientSession.createConsumer(_testQueue);
- checkQueue = clientSession.createQueue(getDurableSubscriptionQueueName());
- }
- else
- {
- consumer = clientSession.createConsumer(dest);
- checkQueue = (Queue) dest;
- }
clientConnection.start();
assertEquals("The queue should have " + MSG_COUNT + " msgs at start",
- MSG_COUNT, getQueueDepth(clientConnection, checkQueue));
+ MSG_COUNT, getQueueDepth(clientConnection, _testQueue));
int expectedDeliveries = MSG_COUNT + ((MAX_DELIVERY_COUNT -1) * redeliverMsgs.size());
@@ -269,17 +226,17 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
&& clientSession.getAcknowledgeMode() != Session.CLIENT_ACKNOWLEDGE)
{
final long timeout = System.currentTimeMillis() + _awaitEmptyQueue;
- while(getQueueDepth(clientConnection, checkQueue) > 0 && System.currentTimeMillis() < timeout)
+ while(getQueueDepth(clientConnection, _testQueue) > 0 && System.currentTimeMillis() < timeout)
{
Thread.sleep(100);
}
}
//check the source queue is now empty
- assertEquals("The queue should have 0 msgs left", 0, getQueueDepth(clientConnection, checkQueue));
+ assertEquals("The queue should have 0 msgs left", 0, getQueueDepth(clientConnection, _testQueue));
//check the DLQ has the required number of rejected-without-requeue messages
- verifyDLQdepth(redeliverMsgs.size(), clientSession, durableSub, clientConnection);
+ verifyDLQdepth(redeliverMsgs.size(), clientConnection);
if (!isBroker10())
{
@@ -294,62 +251,33 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
clientConnection2.start();
//verify the messages on the DLQ
- verifyDLQcontent(clientConnection2, redeliverMsgs, getTestQueueName(), durableSub);
+ verifyDLQcontent(clientConnection2, redeliverMsgs);
clientConnection2.close();
}
else
{
//verify the messages on the DLQ
- verifyDLQcontent(clientConnection, redeliverMsgs, getTestQueueName(), durableSub);
+ verifyDLQcontent(clientConnection, redeliverMsgs);
clientConnection.close();
}
}
}
- private String getDurableSubscriptionQueueName()
+ private void verifyDLQdepth(int expectedQueueDepth, final Connection clientConnection) throws Exception
{
- if ( isBroker10())
- {
- return "qpidsub_/clientid_/" + getName() + "_/durable";
- }
- else
- {
- return "clientid:" + getName();
- }
- }
-
- private void verifyDLQdepth(int expected,
- Session clientSession,
- boolean durableSub,
- final Connection clientConnection) throws Exception
- {
- String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName )
- + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertEquals("The DLQ should have " + expected + " msgs on it",
- expected,
- getQueueDepth(clientConnection, clientSession.createQueue(queueName)));
+ assertEquals("The DLQ should have " + expectedQueueDepth + " msgs on it",
+ expectedQueueDepth,
+ getQueueDepth(clientConnection, _testDeadLetterQueue));
}
- private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs, String destName, boolean durableSub) throws JMSException
+ private void verifyDLQcontent(Connection clientConnection, List<Integer> redeliverMsgs) throws JMSException
{
Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer;
- if(durableSub)
- {
- String queueName = (durableSub ? getDurableSubscriptionQueueName() : _testQueueName )
- + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- consumer = clientSession.createConsumer(clientSession.createQueue(queueName));
- }
- else
- {
- consumer = clientSession.createConsumer(
- clientSession.createQueue(destName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX));
- }
+ MessageConsumer consumer = clientSession.createConsumer(_testDeadLetterQueue);
//keep track of the message we expect to still be on the DLQ
List<Integer> outstandingMessages = new ArrayList<>(redeliverMsgs);
@@ -667,9 +595,4 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
}
}
}
-
- private boolean isDurSubTest()
- {
- return getTestQueueName().contains("DurableSubscription");
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index b623a83..f8b4416 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -128,6 +128,7 @@ org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionL
// These tests specifically test BURL behaviour
org.apache.qpid.server.queue.NodeAutoCreationPolicyTest#testSendingToQueuePatternBURL
org.apache.qpid.server.queue.NodeAutoCreationPolicyTest#testSendingToNonMatchingQueuePatternBURL
+org.apache.qpid.server.queue.NodeAutoCreationPolicyTest#testLegacyQueueDeclareArgumentAlternateBindingCreation
// Message encryption not currently supported by the 1.0 client
org.apache.qpid.systest.messageencryption.MessageEncryptionTest#*
@@ -204,7 +205,7 @@ org.apache.qpid.server.logging.DurableQueueLoggingTest#*
org.apache.qpid.server.logging.QueueLoggingTest#*
org.apache.qpid.server.logging.TransientQueueLoggingTest#*
-// Tests verify the 0-x client's behaviour on recover which is not applicable to new client
+// Tests call Session#recover() to redeliver messages from broker which is not applicable to new client
org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testSynchronousClientAckSession
org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousClientAckSession
org.apache.qpid.test.unit.client.MaxDeliveryCountTest#testAsynchronousDupsOkSession
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] qpid-broker-j git commit: QPID-7606: Remodel alternateExchange
as alternateBinding
Posted by or...@apache.org.
QPID-7606: Remodel alternateExchange as alternateBinding
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e4598dcd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e4598dcd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e4598dcd
Branch: refs/heads/master
Commit: e4598dcd6b480f3c8e80a02109acb16c8240c377
Parents: c2c2bc4
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Jun 15 15:51:30 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Jun 15 15:51:50 2017 +0100
----------------------------------------------------------------------
.../server/store/berkeleydb/BDBUpgradeTest.java | 10 +-
.../qpid/server/exchange/AbstractExchange.java | 132 ++++++--
.../server/exchange/DefaultDestination.java | 17 +-
.../server/exchange/DestinationReferrer.java | 26 ++
.../qpid/server/exchange/ExchangeReferrer.java | 26 --
.../messages/Channel_logmessages.properties | 4 +-
.../qpid/server/message/MessageDestination.java | 7 +
.../qpid/server/model/AlternateBinding.java | 33 ++
.../apache/qpid/server/model/BrokerModel.java | 1 +
.../org/apache/qpid/server/model/Exchange.java | 15 +-
.../qpid/server/model/NamedAddressSpace.java | 2 +
.../org/apache/qpid/server/model/Queue.java | 11 +-
.../apache/qpid/server/queue/AbstractQueue.java | 142 ++++++--
.../server/queue/QueueArgumentsConverter.java | 45 ++-
.../qpid/server/queue/QueueEntryImpl.java | 8 +-
.../VirtualHostStoreUpgraderAndRecoverer.java | 176 +++++++++-
...stractNonConnectionAcceptingVirtualHost.java | 6 +
.../server/virtualhost/AbstractVirtualHost.java | 172 +---------
.../ExchangeIsAlternateException.java | 31 --
.../MessageDestinationIsAlternateException.java | 31 ++
.../virtualhost/QueueManagingVirtualHost.java | 15 -
.../server/exchange/DirectExchangeTest.java | 88 ++++-
.../logging/subjects/BindingLogSubjectTest.java | 3 +-
.../logging/subjects/QueueLogSubjectTest.java | 3 +-
.../server/queue/AbstractQueueTestBase.java | 89 +++++
...stractDurableConfigurationStoreTestCase.java | 148 ++-------
...irtualHostStoreUpgraderAndRecovererTest.java | 118 ++++++-
.../VirtualHostQueueCreationTest.java | 310 +-----------------
.../config/LegacyAccessControlAdapter.java | 13 +-
.../config/LegacyAccessControlAdapterTest.java | 7 +-
.../protocol/v0_10/ConsumerTarget_0_10.java | 7 +-
.../protocol/v0_10/ServerSessionDelegate.java | 71 ++--
.../qpid/server/protocol/v0_8/AMQChannel.java | 56 ++--
.../protocol/v1_0/ConsumerTarget_1_0.java | 7 +-
.../management/amqp/ManagementAddressSpace.java | 25 +-
.../server/management/amqp/ManagementNode.java | 17 +
.../management/amqp/ManagementNodeConsumer.java | 19 +-
.../management/amqp/ProxyMessageSource.java | 19 +-
.../src/main/java/resources/addExchange.html | 27 +-
.../src/main/java/resources/addQueue.html | 42 +--
.../src/main/java/resources/css/common.css | 5 +
.../src/main/java/resources/editQueue.html | 321 -------------------
.../js/qpid/common/AlternateBinding.js | 103 ++++++
.../resources/js/qpid/management/Exchange.js | 27 +-
.../java/resources/js/qpid/management/Queue.js | 13 +-
.../resources/js/qpid/management/addExchange.js | 210 ++++++++----
.../resources/js/qpid/management/addQueue.js | 306 +++++++++++-------
.../resources/js/qpid/management/editQueue.js | 204 ------------
.../src/main/java/resources/showExchange.html | 30 +-
.../src/main/java/resources/showQueue.html | 4 +-
.../org/apache/qpid/systest/rest/Asserts.java | 6 +-
.../qpid/test/utils/AmqpManagementFacade.java | 2 +-
.../queue/NodeAutoCreationPolicyTest.java | 172 +++++++++-
.../routing/AlternateBindingRoutingTest.java | 88 +++++
.../store/VirtualHostMessageStoreTest.java | 2 +-
.../qpid/systest/rest/ExchangeRestTest.java | 18 +-
.../qpid/systest/rest/VirtualHostRestTest.java | 41 ---
.../systest/rest/acl/ExchangeRestACLTest.java | 17 +-
.../test/unit/client/MaxDeliveryCountTest.java | 171 +++-------
test-profiles/Java10Excludes | 3 +-
60 files changed, 1937 insertions(+), 1785 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
index 9323492..41ff80f 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
@@ -26,6 +26,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
@@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.VirtualHostNode;
@@ -363,14 +366,15 @@ public class BDBUpgradeTest extends QpidBrokerTestCase
//verify the queue exists, has the expected alternate exchange and max delivery count
Map<String, Object> queueAttributes = getQueueAttributes(QUEUE_WITH_DLQ_NAME);
- assertEquals("Queue does not have the expected AlternateExchange", QUEUE_WITH_DLQ_NAME + "_DLE",
- (String) queueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_EXCHANGE));
+ assertEquals("Queue does not have the expected AlternateExchange",
+ new HashMap<>(Collections.singletonMap(AlternateBinding.DESTINATION, QUEUE_WITH_DLQ_NAME + "_DLE")),
+ new HashMap<>(((Map<String, Object>) queueAttributes.get(Exchange.ALTERNATE_BINDING))));
assertEquals("Unexpected maximum delivery count", 2,
((Number) queueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS)).intValue());
Map<String, Object> dlQueueAttributes = getQueueAttributes(QUEUE_WITH_DLQ_NAME + "_DLQ");
assertNull("Queue should not have an AlternateExchange",
- dlQueueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_EXCHANGE));
+ dlQueueAttributes.get(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING));
assertEquals("Unexpected maximum delivery count", 0,
((Number) dlQueueAttributes.get(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS)).intValue());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index 92c5bba..aed59d4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -57,8 +57,10 @@ import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredDerivedMethodAttribute;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.DoOnConfigThread;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -78,7 +80,7 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.DeleteDeleteTask;
import org.apache.qpid.server.util.FixedKeyMapCreator;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -100,8 +102,8 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
private static final Operation PUBLISH_ACTION = Operation.ACTION("publish");
private final AtomicBoolean _closed = new AtomicBoolean();
- @ManagedAttributeField(beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange" )
- private Exchange<?> _alternateExchange;
+ @ManagedAttributeField(beforeSet = "preSetAlternateBinding", afterSet = "postSetAlternateBinding" )
+ private AlternateBinding _alternateBinding;
@ManagedAttributeField
private UnroutableMessageBehaviour _unroutableMessageBehaviour;
@ManagedAttributeField
@@ -116,7 +118,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
//The logSubject for ths exchange
private LogSubject _logSubject;
- private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
+ private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
private final AtomicLong _receivedMessageCount = new AtomicLong();
private final AtomicLong _receivedMessageSize = new AtomicLong();
@@ -129,6 +131,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
private final ConcurrentMap<MessageSender, Integer> _linkedSenders = new ConcurrentHashMap<>();
private final List<Action<? super Deletable<?>>> _deleteTaskList = new CopyOnWriteArrayList<>();
+ private volatile MessageDestination _alternateBindingDestination;
public AbstractExchange(Map<String, Object> attributes, QueueManagingVirtualHost<?> vhost)
{
@@ -158,6 +161,14 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
}
+ @Override
+ protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
+ {
+ super.validateChange(proxyForValidation, changedAttributes);
+
+ validateOrCreateAlternateBinding(((Exchange<?>) proxyForValidation), false);
+ }
+
private boolean isReservedExchangeName(String name)
{
return name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)
@@ -175,6 +186,13 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ validateOrCreateAlternateBinding(this, true);
+ }
+
+ @Override
protected void onOpen()
{
super.onOpen();
@@ -219,6 +237,20 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
}
+ if (getAlternateBinding() != null)
+ {
+ String alternateDestination = getAlternateBinding().getDestination();
+ _alternateBindingDestination = getOpenedMessageDestination(alternateDestination);
+ if (_alternateBindingDestination != null)
+ {
+ _alternateBindingDestination.addReference(this);
+ }
+ else
+ {
+ _logger.warn("Cannot find alternate binding destination '{}' for exchange '{}'", alternateDestination, toString());
+ }
+ }
+
getEventLogger().message(ExchangeMessages.CREATED(getType(), getName(), isDurable()));
}
@@ -260,7 +292,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
{
if(hasReferrers())
{
- throw new ExchangeIsAlternateException(getName());
+ throw new MessageDestinationIsAlternateException(getName());
}
if(isReservedExchangeName(getName()))
@@ -285,9 +317,9 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
sender.destinationRemoved(this);
}
- if (_alternateExchange != null)
+ if (_alternateBindingDestination != null)
{
- _alternateExchange.removeReference(AbstractExchange.this);
+ _alternateBindingDestination.removeReference(AbstractExchange.this);
}
getEventLogger().message(_logSubject, ExchangeMessages.DELETED());
@@ -475,41 +507,51 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- public Exchange<?> getAlternateExchange()
+ public AlternateBinding getAlternateBinding()
{
- return _alternateExchange;
+ return _alternateBinding;
}
- private void preSetAlternateExchange()
+ private void preSetAlternateBinding()
{
- if (_alternateExchange != null)
+ if (_alternateBindingDestination != null)
{
- _alternateExchange.removeReference(this);
+ _alternateBindingDestination.removeReference(this);
}
}
@SuppressWarnings("unused")
- private void postSetAlternateExchange()
+ private void postSetAlternateBinding()
{
- if(_alternateExchange != null)
+ if(_alternateBinding != null)
{
- _alternateExchange.addReference(this);
+ _alternateBindingDestination = _virtualHost.getAttainedMessageDestination(_alternateBinding.getDestination(), false);
+ if (_alternateBindingDestination != null)
+ {
+ _alternateBindingDestination.addReference(this);
+ }
}
}
@Override
- public void removeReference(ExchangeReferrer exchange)
+ public MessageDestination getAlternateBindingDestination()
{
- _referrers.remove(exchange);
+ return _alternateBindingDestination;
}
@Override
- public void addReference(ExchangeReferrer exchange)
+ public void removeReference(DestinationReferrer destinationReferrer)
{
- _referrers.put(exchange, Boolean.TRUE);
+ _referrers.remove(destinationReferrer);
}
- public boolean hasReferrers()
+ @Override
+ public void addReference(DestinationReferrer destinationReferrer)
+ {
+ _referrers.add(destinationReferrer);
+ }
+
+ private boolean hasReferrers()
{
return !_referrers.isEmpty();
}
@@ -578,10 +620,10 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
if (!routingResult.hasRoutes())
{
- Exchange altExchange = getAlternateExchange();
- if (altExchange != null)
+ MessageDestination alternateBindingDestination = getAlternateBindingDestination();
+ if (alternateBindingDestination != null)
{
- routingResult.add(altExchange.route(message, routingAddress, instanceProperties));
+ routingResult.add(alternateBindingDestination.route(message, routingAddress, instanceProperties));
}
}
@@ -722,20 +764,19 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
private MessageDestination getOpenedMessageDestination(final String name)
{
- MessageDestination destination = getVirtualHost().getChildByName(Queue.class, name);
+ MessageDestination destination = getVirtualHost().getSystemDestination(name);
if(destination == null)
{
- destination = getVirtualHost().getSystemDestination(name);
+ destination = getVirtualHost().getChildByName(Exchange.class, name);
}
if(destination == null)
{
- destination = getVirtualHost().getChildByName(Exchange.class, name);
+ destination = getVirtualHost().getChildByName(Queue.class, name);
}
return destination;
}
-
@Override
public boolean unbind(@Param(name = "destination", mandatory = true) final String destination,
@Param(name = "bindingKey") String bindingKey)
@@ -856,7 +897,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
private ListenableFuture<Void> doDeleteBeforeInitialize()
{
- preSetAlternateExchange();
+ preSetAlternateBinding();
setState(State.DELETED);
return Futures.immediateFuture(null);
}
@@ -868,11 +909,11 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
try
{
deleteWithChecks();
- preSetAlternateExchange();
+ preSetAlternateBinding();
setState(State.DELETED);
return Futures.immediateFuture(null);
}
- catch(ExchangeIsAlternateException | RequiredExchangeException e)
+ catch(MessageDestinationIsAlternateException | RequiredExchangeException e)
{
// let management know about constraint violations
// in order to report error back to caller
@@ -989,4 +1030,35 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
_linkedSenders.put(sender, oldValue-1);
}
}
+
+ private void validateOrCreateAlternateBinding(final Exchange<?> exchange, final boolean mayCreate)
+ {
+ Object value = exchange.getAttribute(ALTERNATE_BINDING);
+ if (value instanceof AlternateBinding)
+ {
+ AlternateBinding alternateBinding = (AlternateBinding) value;
+ String destinationName = alternateBinding.getDestination();
+ MessageDestination messageDestination =
+ _virtualHost.getAttainedMessageDestination(destinationName, mayCreate);
+ if (messageDestination == null)
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot create alternate binding for '%s' : Alternate binding destination '%s' cannot be found.",
+ getName(), destinationName));
+ }
+ else if (messageDestination == this)
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot create alternate binding for '%s' : Alternate binding destination cannot refer to self.",
+ getName()));
+ }
+ else if (isDurable() && !messageDestination.isDurable())
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot create alternate binding for '%s' : Alternate binding destination '%s' is not durable.",
+ getName(),
+ destinationName));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
index c347d55..c784c75 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java
@@ -21,7 +21,6 @@ package org.apache.qpid.server.exchange;
import java.security.AccessControlException;
import java.util.Map;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSender;
@@ -148,4 +147,20 @@ public class DefaultDestination implements MessageDestination, PermissionedObjec
{
}
+
+ @Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public void removeReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
+ public void addReference(final DestinationReferrer destinationReferrer)
+ {
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/exchange/DestinationReferrer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/DestinationReferrer.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/DestinationReferrer.java
new file mode 100755
index 0000000..54bca20
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/exchange/DestinationReferrer.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.exchange;
+
+public interface DestinationReferrer
+{
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java b/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java
deleted file mode 100755
index e41d63d..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.exchange;
-
-public interface ExchangeReferrer
-{
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
index c873e23..fda86b7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
+++ b/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
@@ -35,8 +35,8 @@ FLOW_REMOVED = CHN-1006 : Flow Control Removed
OPEN_TXN = CHN-1007 : Open Transaction : {0,number} ms
IDLE_TXN = CHN-1008 : Idle Transaction : {0,number} ms
-DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
-DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
+DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate binding configured for queue : {1} routing key : {2}
+DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as alternate binding yields no routes : {1}
DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
index 8cad1db..6030f87 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.message;
import java.security.AccessControlException;
import java.util.Map;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.PublishingLink;
import org.apache.qpid.server.security.SecurityToken;
@@ -53,4 +54,10 @@ public interface MessageDestination extends MessageNode
void linkAdded(MessageSender sender, PublishingLink link);
void linkRemoved(MessageSender sender, PublishingLink link);
+
+ MessageDestination getAlternateBindingDestination();
+
+ void addReference(DestinationReferrer destinationReferrer);
+
+ void removeReference(DestinationReferrer destinationReferrer);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/model/AlternateBinding.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/AlternateBinding.java b/broker-core/src/main/java/org/apache/qpid/server/model/AlternateBinding.java
new file mode 100644
index 0000000..0743351
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/AlternateBinding.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Map;
+
+@ManagedAttributeValueType
+public interface AlternateBinding extends ManagedAttributeValue
+{
+ String DESTINATION = "destination";
+
+ String getDestination();
+
+ Map<String, Object> getAttributes();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
index bd4ca47..92bffb5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/BrokerModel.java
@@ -54,6 +54,7 @@ public final class BrokerModel extends Model
* 6.1 Remove JMX
* Remove PreferencesProvider
* 7.0 Remove bindings, Consumer sole parent is Queue
+ * Remodelled alternateExchange as alternateBindings
*/
public static final int MODEL_MAJOR_VERSION = 7;
public static final int MODEL_MINOR_VERSION = 0;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
index 1f30ec1..904251b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -23,26 +23,25 @@ package org.apache.qpid.server.model;
import java.util.Collection;
import java.util.Map;
-import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSender;
import org.apache.qpid.server.queue.CreatingLinkInfo;
-import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@ManagedObject( description = Exchange.CLASS_DESCRIPTION,
amqpName = "org.apache.qpid.Exchange"
)
public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, MessageDestination,
- ExchangeReferrer, MessageSender
+ DestinationReferrer, MessageSender
{
String CLASS_DESCRIPTION = "<p>An Exchange is a named entity within the Virtualhost which receives messages from "
+ "producers and routes them to matching Queues within the Virtualhost.</p>"
+ "<p>The server provides a set of exchange types with each exchange type implementing "
+ "a different routing algorithm.</p>";
- String ALTERNATE_EXCHANGE = "alternateExchange";
+ String ALTERNATE_BINDING = "alternateBinding";
String DURABLE_BINDINGS = "durableBindings";
String UNROUTABLE_MESSAGE_BEHAVIOUR = "unroutableMessageBehaviour";
String CREATING_LINK_INFO = "creatingLinkInfo";
@@ -55,7 +54,7 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
// Attributes
@ManagedAttribute
- Exchange<?> getAlternateExchange();
+ AlternateBinding getAlternateBinding();
@ManagedAttribute(description = "(AMQP 1.0 only) Default behaviour to apply when a message is not routed to any queues", defaultValue = "DISCARD")
UnroutableMessageBehaviour getUnroutableMessageBehaviour();
@@ -176,12 +175,6 @@ public interface Exchange<X extends Exchange<X>> extends ConfiguredObject<X>, Me
boolean isBound(@Param(name = "arguments") Map<String, Object> arguments,
@Param(name = "queue") Queue<?> queue);
- void removeReference(ExchangeReferrer exchange);
-
- void addReference(ExchangeReferrer exchange);
-
- boolean hasReferrers();
-
EventLogger getEventLogger();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index 370d06d..5076446 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -44,6 +44,8 @@ public interface NamedAddressSpace extends Named
MessageDestination getAttainedMessageDestination(String name);
+ MessageDestination getAttainedMessageDestination(String name, boolean mayCreate);
+
boolean registerConnection(AMQPConnection<?> connection,
final ConnectionEstablishmentPolicy connectionEstablishmentPolicy);
void deregisterConnection(AMQPConnection<?> connection);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index aefbc87..bb8e8cd 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -27,7 +27,7 @@ import java.util.Set;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.qpid.server.exchange.ExchangeReferrer;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInfo;
@@ -43,12 +43,13 @@ import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
@ManagedObject( defaultType = "standard",
amqpName = "org.apache.qpid.Queue",
description = Queue.CLASS_DESCRIPTION )
public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
- Comparable<X>, ExchangeReferrer,
+ Comparable<X>, DestinationReferrer,
BaseQueue,
MessageSource,
MessageDestination,
@@ -65,7 +66,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
String ALERT_THRESHOLD_MESSAGE_SIZE = "alertThresholdMessageSize";
String ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "alertThresholdQueueDepthBytes";
String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
- String ALTERNATE_EXCHANGE = "alternateExchange";
+ String ALTERNATE_BINDING = "alternateBinding";
String EXCLUSIVE = "exclusive";
String MESSAGE_DURABILITY = "messageDurability";
String MESSAGE_GROUP_KEY = "messageGroupKey";
@@ -109,7 +110,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
String DEFAULT_EXCLUSIVITY = "NONE";
@ManagedAttribute
- Exchange getAlternateExchange();
+ AlternateBinding getAlternateBinding();
@ManagedAttribute( defaultValue = "${queue.defaultExclusivityPolicy}")
ExclusivityPolicy getExclusive();
@@ -447,7 +448,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
LogSubject getLogSubject();
- VirtualHost<?> getVirtualHost();
+ QueueManagingVirtualHost<?> getVirtualHost();
boolean isUnused();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 129f9d2..f888c64 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -71,6 +71,7 @@ import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
@@ -85,6 +86,7 @@ import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageDeletedException;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInfo;
import org.apache.qpid.server.message.MessageInfoImpl;
import org.apache.qpid.server.message.MessageInstance;
@@ -118,6 +120,7 @@ import org.apache.qpid.server.util.Deletable;
import org.apache.qpid.server.util.DeleteDeleteTask;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -151,8 +154,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private QueueConsumerManagerImpl _queueConsumerManager;
- @ManagedAttributeField( beforeSet = "preSetAlternateExchange", afterSet = "postSetAlternateExchange")
- private Exchange _alternateExchange;
+ @ManagedAttributeField( beforeSet = "preSetAlternateBinding", afterSet = "postSetAlternateBinding")
+ private AlternateBinding _alternateBinding;
private volatile QueueConsumer<?,?> _exclusiveSubscriber;
@@ -253,17 +256,18 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private final AtomicInteger _recovering = new AtomicInteger(RECOVERING);
private final AtomicInteger _enqueuingWhileRecovering = new AtomicInteger(0);
-
private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
-
- private boolean _closing;
private final ConcurrentMap<String, Callable<MessageFilter>> _defaultFiltersMap = new ConcurrentHashMap<>();
private final List<HoldMethod> _holdMethods = new CopyOnWriteArrayList<>();
+ private final Set<DestinationReferrer> _referrers = Collections.newSetFromMap(new ConcurrentHashMap<DestinationReferrer,Boolean>());
+
+ private boolean _closing;
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
private AdvanceConsumersTask _queueHouseKeepingTask;
private volatile int _bindingCount;
private volatile OverflowPolicyHandler _overflowPolicyHandler;
private long _flowToDiskThreshold;
+ private volatile MessageDestination _alternateBindingDestination;
private interface HoldMethod
{
@@ -314,6 +318,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
});
}
+ validateOrCreateAlternateBinding(this, true);
_recovering.set(RECOVERED);
}
@@ -556,6 +561,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
});
}
+ if (getAlternateBinding() != null)
+ {
+ String alternateDestination = getAlternateBinding().getDestination();
+ _alternateBindingDestination = getOpenedMessageDestination(alternateDestination);
+ if (_alternateBindingDestination != null)
+ {
+ _alternateBindingDestination.addReference(this);
+ }
+ else
+ {
+ _logger.warn("Cannot find alternate binding destination '{}' for queue '{}'", alternateDestination, toString());
+ }
+ }
+
updateAlertChecks();
}
@@ -572,6 +591,21 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
false);
}
+ private MessageDestination getOpenedMessageDestination(final String name)
+ {
+ MessageDestination destination = getVirtualHost().getSystemDestination(name);
+ if(destination == null)
+ {
+ destination = getVirtualHost().getChildByName(Exchange.class, name);
+ }
+
+ if(destination == null)
+ {
+ destination = getVirtualHost().getChildByName(Queue.class, name);
+ }
+ return destination;
+ }
+
private void addLifetimeConstraint(final Deletable<? extends Deletable> lifetimeObject)
{
final Action<Deletable> deleteQueueTask = object -> Subject.doAs(getSubjectWithAddedSystemRights(),
@@ -621,35 +655,46 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _exclusive != ExclusivityPolicy.NONE;
}
- public Exchange<?> getAlternateExchange()
+ @Override
+ public AlternateBinding getAlternateBinding()
{
- return _alternateExchange;
+ return _alternateBinding;
}
- public void setAlternateExchange(Exchange<?> exchange)
+ public void setAlternateBinding(AlternateBinding alternateBinding)
{
- _alternateExchange = exchange;
+ _alternateBinding = alternateBinding;
}
@SuppressWarnings("unused")
- private void postSetAlternateExchange()
+ private void postSetAlternateBinding()
{
- if(_alternateExchange != null)
+ if(_alternateBinding != null)
{
- _alternateExchange.addReference(this);
+ _alternateBindingDestination = _virtualHost.getAttainedMessageDestination(_alternateBinding.getDestination(), false);
+ if (_alternateBindingDestination != null)
+ {
+ _alternateBindingDestination.addReference(this);
+ }
}
}
@SuppressWarnings("unused")
- private void preSetAlternateExchange()
+ private void preSetAlternateBinding()
{
- if(_alternateExchange != null)
+ if(_alternateBindingDestination != null)
{
- _alternateExchange.removeReference(this);
+ _alternateBindingDestination.removeReference(this);
}
}
@Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return _alternateBindingDestination;
+ }
+
+ @Override
public Map<String, Map<String, List<String>>> getDefaultFilters()
{
return _defaultFilters;
@@ -724,7 +769,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _creatingLinkInfo;
}
- public VirtualHost<?> getVirtualHost()
+ public QueueManagingVirtualHost<?> getVirtualHost()
{
return _virtualHost;
}
@@ -1689,6 +1734,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if (_deleted.compareAndSet(false, true))
{
+ if(hasReferrers())
+ {
+ throw new MessageDestinationIsAlternateException(getName());
+ }
+
final int queueDepthMessages = getQueueDepthMessages();
for(MessageSender sender : _linkedSenders.keySet())
@@ -1715,8 +1765,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
routeToAlternate(entries);
- preSetAlternateExchange();
- _alternateExchange = null;
+ preSetAlternateBinding();
+ _alternateBinding = null;
_stopped.set(true);
_queueHouseKeepingTask.cancel();
@@ -2818,7 +2868,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.DELETED)
private ListenableFuture<Void> doDeleteBeforeInitialize()
{
- preSetAlternateExchange();
+ preSetAlternateBinding();
setState(State.DELETED);
return Futures.immediateFuture(null);
}
@@ -2988,6 +3038,11 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
}
+
+ if (changedAttributes.contains(ALTERNATE_BINDING))
+ {
+ validateOrCreateAlternateBinding(queue, false);
+ }
}
@Override
@@ -3176,6 +3231,23 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return getEntries().getLeastSignificantOldestEntry();
}
+ @Override
+ public void removeReference(DestinationReferrer destinationReferrer)
+ {
+ _referrers.remove(destinationReferrer);
+ }
+
+ @Override
+ public void addReference(DestinationReferrer destinationReferrer)
+ {
+ _referrers.add(destinationReferrer);
+ }
+
+ private boolean hasReferrers()
+ {
+ return !_referrers.isEmpty();
+ }
+
private class MessageFinder implements QueueEntryVisitor
{
private final long _messageNumber;
@@ -3353,4 +3425,36 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
_bindingCount--;
}
+
+ private void validateOrCreateAlternateBinding(final Queue<?> queue, final boolean mayCreate)
+ {
+ Object value = queue.getAttribute(ALTERNATE_BINDING);
+ if (value instanceof AlternateBinding)
+ {
+ AlternateBinding alternateBinding = (AlternateBinding) value;
+ String destinationName = alternateBinding.getDestination();
+ MessageDestination messageDestination =
+ _virtualHost.getAttainedMessageDestination(destinationName, mayCreate);
+ if (messageDestination == null)
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot create alternate binding for '%s' : Alternate binding destination '%s' cannot be found.",
+ getName(), destinationName));
+ }
+ else if (messageDestination == this)
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot create alternate binding for '%s' : Alternate binding destination cannot refer to self.",
+ getName()));
+ }
+ else if (isDurable() && !messageDestination.isDurable())
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot create alternate binding for '%s' : Alternate binding destination '%s' is not durable.",
+ getName(),
+ destinationName));
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index dc16f62..aaf9bd1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -20,18 +20,24 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
public class QueueArgumentsConverter
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueueArgumentsConverter.class);
+
public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
public static final String X_QPID_CAPACITY = "x-qpid-capacity";
public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
@@ -79,6 +85,10 @@ public class QueueArgumentsConverter
static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+ private static final String ALTERNATE_EXCHANGE = "alternateExchange";
+ private static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
+ private static String PROPERTY_DEAD_LETTER_QUEUE_SUFFIX = "qpid.broker_dead_letter_queue_suffix";
+
static
{
ATTRIBUTE_MAPPINGS.put(X_QPID_MINIMUM_ALERT_REPEAT_GAP, Queue.ALERT_REPEAT_GAP);
@@ -100,10 +110,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(X_QPID_PRIORITIES, PriorityQueue.PRIORITIES);
ATTRIBUTE_MAPPINGS.put(X_QPID_DESCRIPTION, Queue.DESCRIPTION);
- ATTRIBUTE_MAPPINGS.put(Queue.ALTERNATE_EXCHANGE, Queue.ALTERNATE_EXCHANGE);
-
- ATTRIBUTE_MAPPINGS.put(X_QPID_DLQ_ENABLED, AbstractVirtualHost.CREATE_DLQ_ON_CREATION);
ATTRIBUTE_MAPPINGS.put(QPID_GROUP_HEADER_KEY, Queue.MESSAGE_GROUP_KEY);
ATTRIBUTE_MAPPINGS.put(QPID_DEFAULT_MESSAGE_GROUP_ARG, Queue.MESSAGE_GROUP_DEFAULT_GROUP);
@@ -121,7 +128,8 @@ public class QueueArgumentsConverter
}
- public static Map<String,Object> convertWireArgsToModel(Map<String,Object> wireArguments)
+ public static Map<String,Object> convertWireArgsToModel(final String queueName,
+ Map<String, Object> wireArguments)
{
Map<String,Object> modelArguments = new HashMap<String, Object>();
if(wireArguments != null)
@@ -147,10 +155,7 @@ public class QueueArgumentsConverter
modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))));
}
- if(wireArguments.get(X_QPID_DLQ_ENABLED) != null)
- {
- modelArguments.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.parseBoolean(wireArguments.get(X_QPID_DLQ_ENABLED).toString()));
- }
+
if(wireArguments.get(QPID_NO_LOCAL) != null)
{
@@ -177,6 +182,23 @@ public class QueueArgumentsConverter
modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
}
+ if (wireArguments.get(ALTERNATE_EXCHANGE) != null)
+ {
+ modelArguments.put(Queue.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION,
+ wireArguments.get(ALTERNATE_EXCHANGE)));
+ }
+ else if (wireArguments.containsKey(X_QPID_DLQ_ENABLED))
+ {
+ Object argument = wireArguments.get(X_QPID_DLQ_ENABLED);
+ if ((argument instanceof Boolean && ((Boolean) argument).booleanValue())
+ || (argument instanceof String && Boolean.parseBoolean((String)argument)))
+ {
+ modelArguments.put(Queue.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION,
+ getDeadLetterQueueName(queueName)));
+ }
+ }
}
return modelArguments;
}
@@ -209,4 +231,9 @@ public class QueueArgumentsConverter
return wireArguments;
}
+
+ private static String getDeadLetterQueueName(String name)
+ {
+ return name + System.getProperty(PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6c68be1..3e36ef1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -33,11 +33,13 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDeletedException;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -573,7 +575,7 @@ public abstract class QueueEntryImpl implements QueueEntry
}
final Queue<?> currentQueue = getQueue();
- Exchange<?> alternateExchange = currentQueue.getAlternateExchange();
+ MessageDestination alternateBindingDestination = currentQueue.getAlternateBindingDestination();
boolean autocommit = txn == null;
if(autocommit)
@@ -582,9 +584,9 @@ public abstract class QueueEntryImpl implements QueueEntry
}
RoutingResult result;
- if (alternateExchange != null)
+ if (alternateBindingDestination != null)
{
- result = alternateExchange.route(getMessage(), getMessage().getInitialRoutingAddress(),
+ result = alternateBindingDestination.route(getMessage(), getMessage().getInitialRoutingAddress(),
getInstanceProperties());
}
else
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index d585a0b..84071bf 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -23,11 +23,13 @@ package org.apache.qpid.server.store;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.UUID;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
@@ -42,10 +44,8 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
-import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.util.FixedKeyMapCreator;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationStoreUpgraderAndRecoverer
{
@@ -258,24 +258,26 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
@Override
public void configuredObject(ConfiguredObjectRecord record)
{
- if("VirtualHost".equals(record.getType()))
+ if ("VirtualHost".equals(record.getType()))
{
upgradeRootRecord(record);
}
- else if("Queue".equals(record.getType()))
+ else if ("Queue".equals(record.getType()))
{
Map<String, Object> newAttributes = new LinkedHashMap<String, Object>();
- if(record.getAttributes().get(ARGUMENTS) instanceof Map)
+ if (record.getAttributes().get(ARGUMENTS) instanceof Map)
{
- newAttributes.putAll(QueueArgumentsConverter.convertWireArgsToModel((Map<String, Object>) record.getAttributes()
- .get(ARGUMENTS)));
+ newAttributes.putAll(convertWireArgsToModel((Map<String, Object>) record.getAttributes()
+ .get(ARGUMENTS)));
}
newAttributes.putAll(record.getAttributes());
- record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), newAttributes, record.getParents());
+ record = new ConfiguredObjectRecordImpl(record.getId(),
+ record.getType(),
+ newAttributes,
+ record.getParents());
getUpdateMap().put(record.getId(), record);
}
-
}
@Override
@@ -283,6 +285,65 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
{
}
+ private final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<>();
+
+ {
+ ATTRIBUTE_MAPPINGS.put("x-qpid-minimum-alert-repeat-gap", "alertRepeatGap");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-message-age", "alertThresholdMessageAge");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-message-size", "alertThresholdMessageSize");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-message-count", "alertThresholdQueueDepthMessages");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-queue-depth", "alertThresholdQueueDepthBytes");
+ ATTRIBUTE_MAPPINGS.put("qpid.alert_count", "alertThresholdQueueDepthMessages");
+ ATTRIBUTE_MAPPINGS.put("qpid.alert_size", "alertThresholdQueueDepthBytes");
+ ATTRIBUTE_MAPPINGS.put("qpid.alert_repeat_gap", "alertRepeatGap");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-maximum-delivery-count", "maximumDeliveryAttempts");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-capacity", "queueFlowControlSizeBytes");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-flow-resume-capacity", "queueFlowResumeSizeBytes");
+ ATTRIBUTE_MAPPINGS.put("qpid.queue_sort_key", "sortKey");
+ ATTRIBUTE_MAPPINGS.put("qpid.last_value_queue_key", "lvqKey");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-priorities", "priorities");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-description", "description");
+ ATTRIBUTE_MAPPINGS.put("x-qpid-dlq-enabled", "x-qpid-dlq-enabled");
+ ATTRIBUTE_MAPPINGS.put("qpid.group_header_key", "messageGroupKey");
+ ATTRIBUTE_MAPPINGS.put("qpid.default-message-group", "messageGroupDefaultGroup");
+ ATTRIBUTE_MAPPINGS.put("no-local", "noLocal");
+ ATTRIBUTE_MAPPINGS.put("qpid.message_durability", "messageDurability");
+ }
+
+ private Map<String, Object> convertWireArgsToModel(Map<String, Object> wireArguments)
+ {
+ Map<String, Object> modelArguments = new HashMap<>();
+ if (wireArguments != null)
+ {
+ for (Map.Entry<String, String> entry : ATTRIBUTE_MAPPINGS.entrySet())
+ {
+ if (wireArguments.containsKey(entry.getKey()))
+ {
+ modelArguments.put(entry.getValue(), wireArguments.get(entry.getKey()));
+ }
+ }
+ if (wireArguments.containsKey("qpid.last_value_queue")
+ && !wireArguments.containsKey("qpid.last_value_queue_key"))
+ {
+ modelArguments.put("lvqKey", "qpid.LVQ_key");
+ }
+ if (wireArguments.containsKey("qpid.shared_msg_group"))
+ {
+ modelArguments.put("messageGroupSharedGroups",
+ "1".equals(String.valueOf(wireArguments.get("qpid.shared_msg_group"))));
+ }
+ if (wireArguments.get("x-qpid-dlq-enabled") != null)
+ {
+ modelArguments.put("x-qpid-dlq-enabled",
+ Boolean.parseBoolean(wireArguments.get("x-qpid-dlq-enabled").toString()));
+ }
+ if (wireArguments.get("no-local") != null)
+ {
+ modelArguments.put("noLocal", Boolean.parseBoolean(wireArguments.get("no-local").toString()));
+ }
+ }
+ return modelArguments;
+ }
}
/*
@@ -429,7 +490,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
throw new IllegalConfigurationException("Queue name is not found in queue configuration entry attributes: " + attributes);
}
- String dleSuffix = System.getProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DEFAULT_DLE_NAME_SUFFIX);
+ String dleSuffix = System.getProperty("qpid.broker_dead_letter_exchange_suffix", DEFAULT_DLE_NAME_SUFFIX);
String dleExchangeName = queueName + dleSuffix;
ConfiguredObjectRecord exchangeRecord = findConfiguredObjectRecordInUpdateMap("Exchange", dleExchangeName);
@@ -553,6 +614,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
private final Map<UUID, ConfiguredObjectRecord> _exchanges = new HashMap<>();
private final Map<UUID, String> _queues = new HashMap<>();
private final Map<String, List<Map<String,Object>>> _queueBindings = new HashMap<>();
+ private Set<UUID> _destinationsWithAlternateExchange = new HashSet<>();
public Upgrader_6_1_to_7_0()
@@ -565,7 +627,26 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
{
if("VirtualHost".equals(record.getType()))
{
- upgradeRootRecord(record);
+ record = upgradeRootRecord(record);
+ Map<String, Object> attributes = new HashMap<>(record.getAttributes());
+ boolean modified = attributes.remove("queue_deadLetterQueueEnabled") != null;
+ Object context = attributes.get("context");
+ if(context instanceof Map)
+ {
+ Map<String,Object> contextMap = new HashMap<>((Map<String,Object>) context);
+ modified |= contextMap.remove("queue.deadLetterQueueEnabled") != null;
+ if (modified)
+ {
+ attributes.put("context", contextMap);
+ }
+ }
+
+ if (modified)
+ {
+ record = new ConfiguredObjectRecordImpl(record.getId(), record.getType(), attributes, record.getParents());
+ getUpdateMap().put(record.getId(), record);
+ }
+
}
else if("Binding".equals(record.getType()))
{
@@ -604,6 +685,13 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
existingBinding.get("arguments")));
}
}
+
+ if (record.getAttributes().containsKey("alternateExchange"))
+ {
+ _destinationsWithAlternateExchange.add(record.getId());
+
+ getUpdateMap().put(record.getId(), record);
+ }
}
else if("Queue".equals(record.getType()))
{
@@ -642,6 +730,14 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
}
}
+ boolean addToUpdateMap = false;
+ if (attributes.containsKey("alternateExchange"))
+ {
+ _destinationsWithAlternateExchange.add(record.getId());
+ addToUpdateMap = true;
+
+ }
+
if(attributes.containsKey("bindings"))
{
_queueBindings.put(String.valueOf(attributes.get("name")),
@@ -651,7 +747,7 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
_queues.put(record.getId(), (String) attributes.get("name"));
- if (!attributes.equals(new HashMap<>(record.getAttributes())))
+ if (!attributes.equals(new HashMap<>(record.getAttributes())) || addToUpdateMap)
{
getUpdateMap().put(record.getId(),
new ConfiguredObjectRecordImpl(record.getId(),
@@ -748,6 +844,62 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
}
}
+
+ for (UUID recordId : _destinationsWithAlternateExchange)
+ {
+ ConfiguredObjectRecord record = getUpdateMap().get(recordId);
+ Map<String, Object> attributes = new HashMap<>(record.getAttributes());
+
+ String exchangeNameOrUuid = String.valueOf(attributes.remove("alternateExchange"));
+
+ ConfiguredObjectRecord exchangeRecord = getExchangeFromNameOrUUID(exchangeNameOrUuid);
+ if (exchangeRecord != null)
+ {
+ attributes.put("alternateBinding",
+ Collections.singletonMap("destination", exchangeRecord.getAttributes().get("name")));
+ }
+ else
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot upgrade record UUID '%s' as cannot find exchange with name or UUID '%s'",
+ recordId,
+ exchangeNameOrUuid));
+ }
+
+ getUpdateMap().put(record.getId(),
+ new ConfiguredObjectRecordImpl(record.getId(),
+ record.getType(),
+ attributes,
+ record.getParents()));
+ }
+ }
+
+ private ConfiguredObjectRecord getExchangeFromNameOrUUID(final String exchangeNameOrUuid)
+ {
+ for(ConfiguredObjectRecord record : _exchanges.values())
+ {
+ if(exchangeNameOrUuid.equals(record.getAttributes().get("name")))
+ {
+ return record;
+ }
+ else
+ {
+ try
+ {
+ UUID uuid = UUID.fromString(exchangeNameOrUuid);
+ if (uuid.equals(record.getId()))
+ {
+ return record;
+ }
+ }
+ catch (IllegalArgumentException e)
+ {
+ // ignore - not a UUID
+ }
+ }
+ }
+
+ return null;
}
private UUID getExchangeIdFromNameOrId(final String exchange)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index 53c4b96..5be1888 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -199,6 +199,12 @@ public abstract class AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
}
@Override
+ public MessageDestination getAttainedMessageDestination(final String name, final boolean mayCreate)
+ {
+ return null;
+ }
+
+ @Override
protected void logOperation(final String operation)
{
getAncestor(Broker.class).getEventLogger().message(VirtualHostMessages.OPERATION(operation));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 89344a8..8647e1c 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -47,7 +47,6 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -145,9 +144,7 @@ import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.HousekeepingExecutor;
-import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.Strings;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
@@ -166,10 +163,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
- public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
- public static final String DLQ_ROUTING_KEY = "dlq";
- public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
- private static final int MAX_LENGTH = 255;
private static final Logger _logger = LoggerFactory.getLogger(AbstractVirtualHost.class);
@@ -842,24 +835,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
Map<String, Object> attributes)
{
checkVHostStateIsActive();
- if(childClass == Exchange.class)
- {
- return (ListenableFuture<C>) addExchangeAsync(attributes);
- }
- else if(childClass == Queue.class)
- {
- return (ListenableFuture<C>) addQueueAsync(attributes);
-
- }
- else if(childClass == VirtualHostAlias.class)
- {
- throw new UnsupportedOperationException();
- }
- else if(childClass == VirtualHostLogger.class || childClass == VirtualHostAccessControlProvider.class)
+ if(childClass == Exchange.class || childClass == Queue.class || childClass == VirtualHostLogger.class || childClass == VirtualHostAccessControlProvider.class)
{
return getObjectFactory().createAsync(childClass, attributes, this);
}
+
throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
}
@@ -1364,28 +1345,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _broker;
}
- private ListenableFuture<? extends Queue<?>> addQueueAsync(Map<String, Object> attributes)
- {
- if (shouldCreateDLQ(attributes))
- {
- // TODO - this isn't really correct - what if the name has ${foo} in it?
- String queueName = String.valueOf(attributes.get(Queue.NAME));
- validateDLNames(queueName);
- String altExchangeName = createDLQ(queueName);
- attributes = new LinkedHashMap<String, Object>(attributes);
- attributes.put(Queue.ALTERNATE_EXCHANGE, altExchangeName);
- }
- return Futures.immediateFuture(addQueueWithoutDLQ(attributes));
- }
-
- private Queue<?> addQueueWithoutDLQ(Map<String, Object> attributes)
+ @Override
+ public MessageDestination getAttainedMessageDestination(final String name)
{
- return (Queue) getObjectFactory().create(Queue.class, attributes, this);
+ return getAttainedMessageDestination(name, true);
}
-
@Override
- public MessageDestination getAttainedMessageDestination(final String name)
+ public MessageDestination getAttainedMessageDestination(final String name, final boolean mayCreate)
{
MessageDestination destination = _systemNodeDestinations.get(name);
if(destination == null)
@@ -1396,9 +1363,8 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
{
destination = getAttainedChildFromAddress(Queue.class, name);
}
- if(destination == null)
+ if(destination == null && mayCreate)
{
-
destination = autoCreateNode(name, MessageDestination.class, true);
}
return destination;
@@ -2127,12 +2093,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
- public boolean isQueue_deadLetterQueueEnabled()
- {
- return _queue_deadLetterQueueEnabled;
- }
-
- @Override
public long getHousekeepingCheckPeriod()
{
return _housekeepingCheckPeriod;
@@ -2336,124 +2296,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
});
}
- private String createDLQ(final String queueName)
- {
- final String dlExchangeName = getDeadLetterExchangeName(queueName);
- final String dlQueueName = getDeadLetterQueueName(queueName);
-
- Exchange<?> dlExchange = null;
- final UUID dlExchangeId = UUID.randomUUID();
-
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(org.apache.qpid.server.model.Exchange.ID, dlExchangeId);
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, dlExchangeName);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
- attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
- false ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
- dlExchange = (Exchange<?>) createChild(Exchange.class, attributes);;
- }
- catch(AbstractConfiguredObject.DuplicateNameException e)
- {
- // We're ok if the exchange already exists
- dlExchange = (Exchange<?>) e.getExisting();
- }
- catch (ReservedExchangeNameException | NoFactoryForTypeException | UnknownConfiguredObjectException e)
- {
- throw new ConnectionScopedRuntimeException("Attempt to create an alternate exchange for a queue failed",e);
- }
-
- Queue<?> dlQueue = null;
-
- {
- dlQueue = (Queue<?>) getChildByName(Queue.class, dlQueueName);
-
- if(dlQueue == null)
- {
- //set args to disable DLQ-ing/MDC from the DLQ itself, preventing loops etc
- final Map<String, Object> args = new HashMap<String, Object>();
- args.put(CREATE_DLQ_ON_CREATION, false);
- args.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 0);
-
- args.put(Queue.ID, UUID.randomUUID());
- args.put(Queue.NAME, dlQueueName);
- args.put(Queue.DURABLE, true);
- dlQueue = addQueueWithoutDLQ(args);
- childAdded(dlQueue);
-
- }
- }
-
- //ensure the queue is bound to the exchange
- if(!dlExchange.isBound(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue))
- {
- //actual routing key used does not matter due to use of fanout exchange,
- //but we will make the key 'dlq' as it can be logged at creation.
- dlExchange.addBinding(AbstractVirtualHost.DLQ_ROUTING_KEY, dlQueue, null);
- }
- return dlExchangeName;
- }
-
- private static void validateDLNames(String name)
- {
- // check if DLQ name and DLQ exchange name do not exceed 255
- String exchangeName = getDeadLetterExchangeName(name);
- if (exchangeName.length() > MAX_LENGTH)
- {
- throw new IllegalArgumentException("DL exchange name '" + exchangeName
- + "' length exceeds limit of " + MAX_LENGTH + " characters for queue " + name);
- }
- String queueName = getDeadLetterQueueName(name);
- if (queueName.length() > MAX_LENGTH)
- {
- throw new IllegalArgumentException("DLQ queue name '" + queueName + "' length exceeds limit of "
- + MAX_LENGTH + " characters for queue " + name);
- }
- }
-
- private boolean shouldCreateDLQ(Map<String, Object> arguments)
- {
-
- boolean autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
- Queue.LIFETIME_POLICY,
- arguments,
- LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
-
- //feature is not to be enabled for temporary queues or when explicitly disabled by argument
- if (!(autoDelete || (arguments != null && arguments.containsKey(Queue.ALTERNATE_EXCHANGE))))
- {
- boolean dlqArgumentPresent = arguments != null
- && arguments.containsKey(CREATE_DLQ_ON_CREATION);
- if (dlqArgumentPresent)
- {
- boolean dlqEnabled = true;
- if (dlqArgumentPresent)
- {
- Object argument = arguments.get(CREATE_DLQ_ON_CREATION);
- dlqEnabled = (argument instanceof Boolean && ((Boolean)argument).booleanValue())
- || (argument instanceof String && Boolean.parseBoolean(argument.toString()));
- }
- return dlqEnabled;
- }
- return isQueue_deadLetterQueueEnabled();
- }
- return false;
- }
-
- private static String getDeadLetterQueueName(String name)
- {
- return name + System.getProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX);
- }
-
- private static String getDeadLetterExchangeName(String name)
- {
- return name + System.getProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX);
- }
-
@Override
public String getModelVersion()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
deleted file mode 100644
index 22a9ba1..0000000
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.model.IntegrityViolationException;
-
-public class ExchangeIsAlternateException extends IntegrityViolationException
-{
- public ExchangeIsAlternateException(String name)
- {
- super("Exchange '" + name + "' is in use as an alternate exchange");
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageDestinationIsAlternateException.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageDestinationIsAlternateException.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageDestinationIsAlternateException.java
new file mode 100644
index 0000000..88690f3
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageDestinationIsAlternateException.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import org.apache.qpid.server.model.IntegrityViolationException;
+
+public class MessageDestinationIsAlternateException extends IntegrityViolationException
+{
+ public MessageDestinationIsAlternateException(String name)
+ {
+ super("Destination '" + name + "' is in use as an alternate binding");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
index 2b9717c..3d4ee58 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueManagingVirtualHost.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.virtualhost;
-import static org.apache.qpid.server.model.Initialization.materialize;
-
import java.security.AccessControlContext;
import java.util.Collection;
import java.util.List;
@@ -75,15 +73,6 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
String NODE_AUTO_CREATION_POLICIES = "nodeAutoCreationPolicies";
- String QUEUE_DEAD_LETTER_QUEUE_ENABLED = "queue.deadLetterQueueEnabled";
-
- @ManagedContextDefault( name = "queue.deadLetterQueueEnabled")
- boolean DEFAULT_DEAD_LETTER_QUEUE_ENABLED = false;
-
- String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
- String PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX = "qpid.broker_dead_letter_exchange_suffix";
- String PROPERTY_DEAD_LETTER_QUEUE_SUFFIX = "qpid.broker_dead_letter_queue_suffix";
-
@ManagedContextDefault( name = "virtualhost.housekeepingCheckPeriod")
long DEFAULT_HOUSEKEEPING_CHECK_PERIOD = 30000l;
@@ -136,9 +125,6 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
long getStoreTransactionOpenTimeoutWarn();
- @ManagedAttribute( defaultValue = "${queue.deadLetterQueueEnabled}", initialization = materialize)
- boolean isQueue_deadLetterQueueEnabled();
-
@ManagedAttribute( defaultValue = "${virtualhost.housekeepingCheckPeriod}")
long getHousekeepingCheckPeriod();
@@ -287,7 +273,6 @@ public interface QueueManagingVirtualHost<X extends QueueManagingVirtualHost<X>>
Queue<?> getAttainedQueue(String name);
-
String getLocalAddress(String routingAddress);
<T extends ConfiguredObject<?>> T getAttainedChildFromAddress(Class<T> childClass,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] qpid-broker-j git commit: QPID-7606: Remodel alternateExchange
as alternateBinding
Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index c4f5515..020eb8c 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -55,6 +55,7 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.AMQMessageHeader;
@@ -383,6 +384,22 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
}
+ @Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public void removeReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
+ public void addReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
private synchronized void enqueue(InternalMessage message,
InstanceProperties properties,
Action<? super MessageInstance> postEnqueueAction)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index b6bcb64..10e9491 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -28,7 +28,9 @@ import java.util.Map;
import java.util.UUID;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
@@ -38,7 +40,6 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.PublishingLink;
-import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.session.AMQPSession;
@@ -189,6 +190,22 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
}
@Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public void removeReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
+ public void addReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
public T getTarget()
{
return _target;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 7df8887..6be1fb3 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageContainer;
@@ -89,7 +90,7 @@ public class ProxyMessageSource implements MessageSource, MessageDestination
public void authorisePublish(final SecurityToken token, final Map<String, Object> arguments)
throws AccessControlException
{
- throw new AccessControlException("Sending messages to temporary addresses in a management addres spaceis not supported");
+ throw new AccessControlException("Sending messages to temporary addresses in a management address space is not supported");
}
@Override
@@ -119,6 +120,22 @@ public class ProxyMessageSource implements MessageSource, MessageDestination
}
@Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public void removeReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
+ public void addReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
public UUID getId()
{
return _id;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/addExchange.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/addExchange.html b/broker-plugins/management-http/src/main/java/resources/addExchange.html
index 25e9752..0d4eb37 100644
--- a/broker-plugins/management-http/src/main/java/resources/addExchange.html
+++ b/broker-plugins/management-http/src/main/java/resources/addExchange.html
@@ -19,7 +19,7 @@
-
-->
<div class="dijitHidden">
- <div data-dojo-type="dijit.Dialog" style="width:600px;" data-dojo-props="title:'Add Exchange'" id="addExchange">
+ <div data-dojo-type="dijit.Dialog" style="width:600px;" data-dojo-props="title:'Exchange'" id="addExchange">
<form id="formAddExchange" method="post" dojoType="dijit.form.Form">
<div class="clear">
<div class="formLabel-labelCell">Name*:</div>
@@ -62,12 +62,31 @@
checked: true"/>
</div>
</div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Alternate Binding:</div>
+ <div class="formLabel-controlCell">
+ <input type="select" id="formAddExchange.alternateBinding"
+ data-dojo-type="qpid/common/AlternateBinding"
+ data-dojo-props="
+ name: 'alternateBinding',
+ value: ' ',
+ placeHolder: 'alternate binding',
+ promptMessage: 'Alternate binding to redirect messages to',
+ title: 'Select an alternate binding to redirect messages to'"/>
+ </div>
+ </div>
-
- <div class="clear"></div>
+ <div class="editorPanel clear"
+ data-dojo-type="dijit/TitlePane"
+ data-dojo-props="title: 'Context variables', open: false">
+ <div id="formAddExchange.context"
+ data-dojo-type="qpid.common.ContextVariablesEditor"
+ data-dojo-props="name: 'context', title: 'Context variables'"></div>
+ </div>
<div class="dijitDialogPaneActionBar qpidDialogPaneActionBar">
- <input type="submit" value="Create Exchange" label="Create Exchange" dojoType="dijit.form.Button" />
+ <input type="submit" data-dojo-type="dijit/form/Button" id="formAddExchange.saveButton" data-dojo-props="label: 'Save'"/>
+ <button data-dojo-type="dijit/form/Button" id="formAddExchange.cancelButton" data-dojo-props="label: 'Cancel'" ></button>
</div>
</form>
</div>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/addQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/addQueue.html b/broker-plugins/management-http/src/main/java/resources/addQueue.html
index 8f2929a..7eb64db 100644
--- a/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -19,9 +19,10 @@
-
-->
<div class="dijitHidden">
- <div data-dojo-type="dijit.Dialog" data-dojo-props="title:'Add Queue'" id="addQueue">
+ <div data-dojo-type="dijit.Dialog" data-dojo-props="title:'Queue'" id="addQueue">
<form id="formAddQueue" method="post" dojoType="dijit.form.Form">
+ <div class="editNoteBanner" id="addQueue.editNoteBanner">NOTE: All changes will only take effect after Virtual Host restart.</div>
<div class="clear">
<div class="formLabel-labelCell">Queue Name*:</div>
<div class="formLabel-controlCell">
@@ -292,26 +293,27 @@
data-dojo-props="
name: 'maximumDeliveryAttempts',
placeHolder: 'number of retries',
- promptMessage: 'Maximum number of delivery attempts before the message will be sent to the alternate exchange',
- title: 'Enter the maximum number of delivery attempts before the message will be sent to the alternate exchange',
+ promptMessage: 'Maximum number of delivery attempts before the message will be sent to the alternate binding',
+ title: 'Enter the maximum number of delivery attempts before the message will be sent to the alternate binding',
trim: true"/>
</div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Create DLQ?</div>
+ <div class="formLabel-labelCell">Alternate Binding:</div>
<div class="formLabel-controlCell">
- <input type="checkbox" id="formAddQueue.dlqEnabled"
- dojoType="dijit.form.CheckBox"
+ <input type="select" id="formAddQueue.alternateBinding"
+ data-dojo-type="qpid/common/AlternateBinding"
data-dojo-props="
- name: 'x-qpid-dlq-enabled',
- value: 'dlqEnabled',
- title: 'Controls where a dead letter queue is automatically created',
- checked: false"/>
+ name: 'alternateBinding',
+ value: ' ',
+ placeHolder: 'alternate binding',
+ promptMessage: 'Alternate binding to redirect messages to',
+ title: 'Select an alternate binding to redirect messages on queue deletion or exceeding of maximum delivery retries'"/>
</div>
</div>
<div class="clear"></div>
- <div class="infoMessage">Configuring maximum delivery retries on a queue which has no DLQ or alternate <br/>exchange will result in messages being discarded after the limit is reached.</div>
+ <div class="infoMessage">Configuring maximum delivery retries on a queue which has no alternate binding (DLQ or exchange) <br/> will result in messages being discarded after the limit is reached.</div>
<div class="clear">
<div class="formLabel-labelCell">Message Group Key:</div>
@@ -354,14 +356,18 @@
<div class="clear"></div>
</div>
- <div class="clear" data-dojo-type="dijit/TitlePane" data-dojo-props="title: 'Context variables', open: false">
- <div id="formAddQueue.context" ></div>
- </div>
+ <div class="editorPanel clear"
+ data-dojo-type="dijit/TitlePane"
+ data-dojo-props="title: 'Context variables', open: false">
+ <div id="formAddQueue.context"
+ data-dojo-type="qpid.common.ContextVariablesEditor"
+ data-dojo-props="name: 'context', title: 'Context variables'"></div>
+ </div>
- <div class="dijitDialogPaneActionBar qpidDialogPaneActionBar">
- <!-- submit buttons -->
- <input type="submit" value="Create Queue" label="Create Queue" dojoType="dijit.form.Button" />
- </div>
+ <div class="dijitDialogPaneActionBar qpidDialogPaneActionBar">
+ <input type="submit" data-dojo-type="dijit/form/Button" id="formAddQueue.saveButton" data-dojo-props="label: 'Save'"/>
+ <button data-dojo-type="dijit/form/Button" id="formAddQueue.cancelButton" data-dojo-props="label: 'Cancel'" ></button>
+ </div>
</form>
</div>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/css/common.css
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/css/common.css b/broker-plugins/management-http/src/main/java/resources/css/common.css
index 21f6acd..8dad0ef 100644
--- a/broker-plugins/management-http/src/main/java/resources/css/common.css
+++ b/broker-plugins/management-http/src/main/java/resources/css/common.css
@@ -744,3 +744,8 @@ td.advancedSearchField, col.autoWidth {
{
font-style: italic;
}
+
+.claro .loading
+{
+ background-image: url("../dojo/dijit/themes/claro/images/loadingAnimation.gif");
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/editQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/editQueue.html b/broker-plugins/management-http/src/main/java/resources/editQueue.html
deleted file mode 100644
index d5883e0..0000000
--- a/broker-plugins/management-http/src/main/java/resources/editQueue.html
+++ /dev/null
@@ -1,321 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-<div class="dijitHidden">
- <div data-dojo-type="dijit.Dialog" data-dojo-props="title:'Edit Queue'" id="editQueue">
- <form id="formEditQueue" method="post" dojoType="dijit.form.Form">
- <div id="formEditQueue.allFields">
- <div id="formEditQueue.contentPane">
- <div class="editNoteBanner">NOTE: All changes will only take effect after Virtual Host restart.</div>
- <div class="clear">
- <div class="formLabel-labelCell">Queue Name:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.name"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'name',
- placeHolder: 'queue name',
- required: true,
- promptMessage: 'Name of queue',
- disabled: 'true'"/>
- </div>
- </div>
-
- <div class="clear">
- <div class="formLabel-labelCell">Queue Type:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.type"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'type',
- placeHolder: 'queue type',
- required: true,
- promptMessage: 'Type of queue',
- disabled: 'true'"/>
- </div>
- </div>
- <div class="clear"></div>
-
- <div id="formEditQueueType:priority" class="hidden typeSpecificDiv">
- <div class="clear">
- <div class="formLabel-labelCell">Priorities:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.priorities"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'priorities',
- disabled: 'true',
- promptMessage: 'Number of priorities supported by the queue',
- title: 'Enter the number of priorities supported by the queue',
- trim: true"/>
- </div>
- </div>
- <div class="clear"></div>
- </div>
-
- <div id="formEditQueueType:lvq" class="hidden typeSpecificDiv">
- <div class="clear">
- <div class="formLabel-labelCell">LVQ Message Property:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.lvqKey"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'lvqKey',
- disabled: 'true',
- promptMessage: 'Name of the message property used to perform the conflation',
- title: 'Enter the name of the message property used to perform the conflation',
- trim: true"/>
- </div>
- </div>
- <div class="clear"></div>
- </div>
-
- <div id="formEditQueueType:sorted" class="hidden typeSpecificDiv">
- <div class="clear">
- <div class="formLabel-labelCell">Sort Message Property*:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.sortKey"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'sortKey',
- disabled: 'true',
- promptMessage: 'Name of the message property used for sorting the messages on the queue',
- title: 'Enter the name of the message property used for sorting the messages on the queue',
- trim: true"/>
- </div>
- </div>
- <div class="clear"></div>
- </div>
-
- <div class="clear">
- <div class="formLabel-labelCell">Durable?</div>
- <div class="formLabel-controlCell">
- <input type="checkbox" id="formEditQueue.durable"
- dojoType="dijit.form.CheckBox"
- data-dojo-props="
- name: 'durable',
- value: 'durable',
- disabled: 'true'"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Persist Messages?</div>
- <div class="formLabel-controlCell">
- <select id="formEditQueue.messageDurability"
- dojoType="dijit.form.FilteringSelect"
- data-dojo-props="
- name: 'messageDurability',
- value: '',
- searchAttr: 'name',
- required: false,
- promptMessage: 'Message persistence override. If not default, messages arriving will have persistence setting overridden',
- title: 'Enter message persistence override'">
- <option value="ALWAYS">Always</option>
- <option value="DEFAULT">Default</option>
- <option value="NEVER">Never</option>
- </select>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Maximum Ttl:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.maximumMessageTtl"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'maximumMessageTtl',
- placeHolder: 'ttl in ms',
- promptMessage: 'Maximum message time to live (ttl) in ms. Messages arriving with larger ttl values will be overridden by this value',
- title: 'Enter the maximum message time to live in milliseconds',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Minimum Ttl:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.minimumMessageTtl"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'minimumMessageTtl',
- placeHolder: 'ttl in ms',
- promptMessage: 'Minimum message time to live (ttl) in ms. Messages arriving with smaller ttl values will be overridden by this value',
- title: 'Enter the minimum message time to live in milliseconds',
- trim: true"/>
- </div>
- </div>
-
- <div class="clear formBox">
- <fieldset>
- <legend>Overflow Settings</legend>
- <div class="clear">
- <div class="formLabel-labelCell">Overflow policy:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.overflowPolicy"
- data-dojo-type="dijit/form/FilteringSelect"
- data-dojo-props="
- name: 'overflowPolicy',
- required: false,
- promptMessage: 'Select overflow policy to use',
- title: 'Select overflow policy'"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Maximum Queue Depth (Messages):</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.maximumQueueDepthMessages"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'maximumQueueDepthMessages',
- placeHolder: 'maximum number of messages',
- promptMessage: 'Maximum number of messages on the queue. If overflow policy is None or value is negative, no maximum is enforeced.',
- title: 'Enter the maximum number of messages on the queue',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Maximum Queue Depth (Bytes):</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.maximumQueueDepthBytes"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'maximumQueueDepthBytes',
- placeHolder: 'maximum number of bytes',
- promptMessage: 'Maximum number of bytes on the queue. If overflow policy is None or value is negative, no maximum is enforeced.',
- title: 'Enter the maximum number of bytes on the queue',
- trim: true"/>
- </div>
- </div>
- </fieldset>
- <div class="clear"></div>
- </div>
-
- <div class="clear formBox">
- <fieldset>
- <legend>Alerting Settings</legend>
- <div class="clear">
- <div class="formLabel-labelCell">Queue Depth (Messages):</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.alertThresholdQueueDepthMessages"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'alertThresholdQueueDepthMessages',
- placeHolder: 'number of messages',
- promptMessage: 'Ceiling value for number of messages on queue before alerts will be generated',
- title: 'Enter the ceiling value for number of messages on queue before alerts will be generated',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Queue Depth (Bytes):</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.alertThresholdQueueDepthBytes"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'alertThresholdQueueDepthBytes',
- placeHolder: 'total message size in bytes',
- promptMessage: 'Ceiling value (in bytes) for total size of all messages on the queue before alerts will be generated',
- title: 'Enter the ceiling value (in bytes) for total size of all messages on the queue before alerts will be generated',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Message Age:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.alertThresholdMessageAge"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'alertThresholdMessageAge',
- placeHolder: 'time in ms',
- promptMessage: 'Message age (in milliseconds) above which alerts will be generated',
- title: 'Enter the message age (in milliseconds) above which alerts will be generated',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Message Size:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.alertThresholdMessageSize"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'alertThresholdMessageSize',
- placeHolder: 'message size in bytes',
- promptMessage: 'Message size (in bytes) above which alerts will be generated',
- title: 'Enter the message size (in bytes) above which alerts will be generated',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Gap between alerts:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.alertRepeatGap"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'alertRepeatGap',
- placeHolder: 'time in ms',
- promptMessage: 'Minimum time (in milliseconds) between each alert',
- title: 'Enter the minimum time (in milliseconds) between each alert.',
- trim: true"/>
- </div>
- </div>
- </fieldset>
- <div class="clear"></div>
- </div>
-
- <div class="clear formBox">
- <fieldset>
- <legend>Other Settings</legend>
- <div class="clear">
- <div class="formLabel-labelCell">Maximum Delivery Retries:</div>
- <div class="formLabel-controlCell">
- <input type="text" id="formEditQueue.maximumDeliveryAttempts"
- data-dojo-type="dijit/form/ValidationTextBox"
- data-dojo-props="
- name: 'maximumDeliveryAttempts',
- placeHolder: 'number of retries',
- promptMessage: 'Maximum number of delivery attempts before the message will be sent to the alternate exchange',
- title: 'Enter the maximum number of delivery attempts before the message will be sent to the alternate exchange',
- trim: true"/>
- </div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Hold on Publish Enabled?</div>
- <div class="formLabel-controlCell">
- <input type="checkbox" id="formEditQueue.holdOnPublishEnabled"
- dojoType="dijit.form.CheckBox"
- data-dojo-props="
- name: 'holdOnPublishEnabled',
- value: 'holdOnPublishEnabled',
- title: 'If enabled the messages on the queue will be inspected for the x-qpid-not-valid-before header/annotation'"/>
- </div>
- </div>
- </fieldset>
- <div class="clear"></div>
- </div>
-
- <div data-dojo-type="dijit/TitlePane" data-dojo-props="title: 'Context variables', open: false">
- <div id="formEditQueue.context" ></div>
- </div>
- </div>
- <div class="dijitDialogPaneActionBar qpidDialogPaneActionBar">
- <button data-dojo-type="dijit/form/Button" id="formEditQueue.saveButton" data-dojo-props="label: 'Save'">Save</button>
- <button data-dojo-type="dijit/form/Button" id="formEditQueue.cancelButton" data-dojo-props="label: 'Cancel'" ></button>
- </div>
- </div>
-
- </form>
- </div>
-</div>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/js/qpid/common/AlternateBinding.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/common/AlternateBinding.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/AlternateBinding.js
new file mode 100644
index 0000000..feb47b3
--- /dev/null
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/common/AlternateBinding.js
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+define(["dojo/_base/declare",
+ "dojo/_base/lang",
+ "dojo/promise/all",
+ "dojo/dom-class",
+ "dojo/store/Memory",
+ "dojo/Deferred",
+ "dijit/form/ComboBox",
+ 'qpid/common/util',
+ "dojo/domReady!"],
+ function (declare, lang, all, domClass, Memory, Deferred, ComboBox, util)
+ {
+ return declare("qpid.common.AlternateBinding", [ComboBox],
+ {
+ loadData: function (management, modelObj)
+ {
+ var deferred = new Deferred();
+ domClass.add(this.domNode, "loading");
+ this.set("placeHolder", " Loading...");
+ this.set("disabled", true);
+ var queuesQuery = management.query({
+ "category": "Queue",
+ "parent": modelObj,
+ "select": "name as id, name",
+ "orderBy": "name"
+ });
+ var exchangesQuery = management.query({
+ "category": "Exchange",
+ "parent": modelObj,
+ "select": "name as id, name",
+ "orderBy": "name"
+ });
+ all({
+ "queues": queuesQuery,
+ "exchanges": exchangesQuery
+ })
+ .then(lang.hitch(this,
+ function (data)
+ {
+ var items = [];
+ try
+ {
+ items.push({"id" : "", "name": "-- Queues --"});
+ items = items.concat(util.queryResultToObjects(data.queues));
+ items.push({"id" : "", "name": "-- Exchanges --"});
+ items = items.concat(util.queryResultToObjects(data.exchanges));
+ var store = new Memory({data: items});
+ this.set("store", store);
+ this.set("disabled", false);
+ domClass.remove(this.domNode, "loading");
+ this.set("placeHolder", "alternate binding");
+ }
+ finally
+ {
+ deferred.resolve(items);
+ }
+ }),
+ function (error)
+ {
+ deferred.reject(error);
+ }
+ );
+ return deferred.promise;
+ },
+
+ valueAsJson: function ()
+ {
+ var destination = this.get("item") ? this.get("item").id : this.get("value");
+ if (destination)
+ {
+ destination = destination.replace(/^\s+|\s+$/gm, '');
+ }
+ if (destination)
+ {
+ return {"destination": destination};
+ }
+ else
+ {
+ return null;
+ }
+ }
+ });
+ });
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
index 500ebf1..2a04258 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Exchange.js
@@ -22,6 +22,7 @@ define(["dojo/_base/xhr",
"dojo/parser",
"dojo/query",
"dojo/_base/connect",
+ "dojo/_base/event",
"dojo/json",
"dojo/_base/lang",
"dojo/promise/all",
@@ -32,6 +33,7 @@ define(["dojo/_base/xhr",
"qpid/common/formatter",
"qpid/common/UpdatableStore",
"qpid/management/addBinding",
+ "qpid/management/addExchange",
"dojox/grid/EnhancedGrid",
"dojox/html/entities",
"dojo/text!showExchange.html",
@@ -40,6 +42,7 @@ define(["dojo/_base/xhr",
parser,
query,
connect,
+ event,
json,
lang,
all,
@@ -50,6 +53,7 @@ define(["dojo/_base/xhr",
formatter,
UpdatableStore,
addBinding,
+ addExchange,
EnhancedGrid,
entities,
template)
@@ -97,14 +101,22 @@ define(["dojo/_base/xhr",
var addBindingButton = query(".addBindingButton", contentPane.containerNode)[0];
connect.connect(registry.byNode(addBindingButton), "onClick", function (evt)
{
+ event.stop(evt);
addBinding.show(that.management, that.modelObj);
});
var deleteBindingButton = query(".deleteBindingButton", contentPane.containerNode)[0];
connect.connect(registry.byNode(deleteBindingButton), "onClick", function (evt)
{
+ event.stop(evt);
that.deleteBindings();
});
+ var editExchangeButton = query(".editExchangeButton", contentPane.containerNode)[0];
+ connect.connect(registry.byNode(editExchangeButton), "onClick", function (evt)
+ {
+ event.stop(evt);
+ addExchange.show(that.management, that.modelObj, that.exchangeUpdater.exchangeData);
+ });
var isStandard = util.isReservedExchangeName(that.name);
var deleteExchangeButton = query(".deleteExchangeButton", contentPane.containerNode)[0];
@@ -117,6 +129,7 @@ define(["dojo/_base/xhr",
{
connect.connect(node, "onClick", function (evt)
{
+ event.stop(evt);
that.deleteExchange();
});
}
@@ -195,15 +208,7 @@ define(["dojo/_base/xhr",
"state",
"durable",
"lifetimePolicy",
- "alertRepeatGap",
- "alertRepeatGapUnits",
- "alertThresholdMessageAge",
- "alertThresholdMessageAgeUnits",
- "alertThresholdMessageSize",
- "alertThresholdMessageSizeUnits",
- "alertThresholdQueueDepthBytes",
- "alertThresholdQueueDepthBytesUnits",
- "alertThresholdQueueDepthMessages",
+ "alternateBinding",
"msgInRate",
"bytesInRate",
"bytesInRateUnits",
@@ -255,7 +260,9 @@ define(["dojo/_base/xhr",
this.state.innerHTML = entities.encode(String(this.exchangeData["state"]));
this.durable.innerHTML = entities.encode(String(this.exchangeData["durable"]));
this.lifetimePolicy.innerHTML = entities.encode(String(this.exchangeData["lifetimePolicy"]));
-
+ this.alternateBinding.innerHTML =
+ this.exchangeData["alternateBinding"] && this.exchangeData["alternateBinding"]["destination"]
+ ? entities.encode(String(this.exchangeData["alternateBinding"]["destination"])) : "";
};
ExchangeUpdater.prototype.update = function (callback)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index d68ed2a..6ba1607 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -35,7 +35,7 @@ define(["dojo/_base/declare",
"qpid/management/addBinding",
"qpid/management/moveCopyMessages",
"qpid/management/showMessage",
- "qpid/management/editQueue",
+ "qpid/management/addQueue",
"qpid/common/JsonRest",
"dojox/grid/EnhancedGrid",
"qpid/management/query/QueryGrid",
@@ -62,7 +62,7 @@ define(["dojo/_base/declare",
addBinding,
moveMessages,
showMessage,
- editQueue,
+ addQueue,
JsonRest,
EnhancedGrid,
QueryGrid,
@@ -243,7 +243,7 @@ define(["dojo/_base/declare",
connect.connect(registry.byNode(editQueueButton), "onClick", function (evt)
{
event.stop(evt);
- editQueue.show(that.management, that.modelObj);
+ addQueue.show(that.management, that.modelObj, that.queueUpdater.queueData);
});
that.queueUpdater.update(function ()
{
@@ -405,7 +405,7 @@ define(["dojo/_base/declare",
"alertThresholdQueueDepthBytes",
"alertThresholdQueueDepthBytesUnits",
"alertThresholdQueueDepthMessages",
- "alternateExchange",
+ "alternateBinding",
"messageGroups",
"messageGroupKey",
"messageGroupSharedGroups",
@@ -560,8 +560,9 @@ define(["dojo/_base/declare",
this.minimumMessageTtl.innerHTML = entities.encode(String(this.queueData["minimumMessageTtl"]));
this.maximumMessageTtl.innerHTML = entities.encode(String(this.queueData["maximumMessageTtl"]));
- this.alternateExchange.innerHTML =
- this.queueData["alternateExchange"] ? entities.encode(String(this.queueData["alternateExchange"])) : "";
+ this.alternateBinding.innerHTML =
+ this.queueData["alternateBinding"] && this.queueData["alternateBinding"]["destination"]
+ ? entities.encode(String(this.queueData["alternateBinding"]["destination"])) : "";
this.queueDepthMessagesIncludingHeader.innerHTML = entities.encode(String(this.queueData["queueDepthMessages"]));
bytesDepth = formatter.formatBytes(this.queueData["queueDepthBytesIncludingHeader"]);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addExchange.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addExchange.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addExchange.js
index 6010995..1337401 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addExchange.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addExchange.js
@@ -23,104 +23,180 @@ define(["dojo/dom",
"dojo/_base/window",
"dijit/registry",
"dojo/parser",
+ "dojo/_base/lang",
"dojo/_base/array",
"dojo/_base/event",
"dojo/_base/json",
"qpid/common/util",
"dojo/text!addExchange.html",
- "dijit/form/NumberSpinner", // required by the form
- /* dojox/ validate resources */
+ "qpid/common/AlternateBinding",
+ "qpid/common/ContextVariablesEditor",
"dojox/validate/us",
"dojox/validate/web",
- /* basic dijit classes */
+ "dijit/TitlePane",
"dijit/Dialog",
"dijit/form/CheckBox",
- "dijit/form/Textarea",
"dijit/form/FilteringSelect",
- "dijit/form/TextBox",
"dijit/form/ValidationTextBox",
- "dijit/form/DateTextBox",
- "dijit/form/TimeTextBox",
"dijit/form/Button",
- "dijit/form/RadioButton",
"dijit/form/Form",
- "dijit/form/DateTextBox",
- /* basic dojox classes */
- "dojox/form/BusyButton",
- "dojox/form/CheckedMultiSelect",
- "dojo/domReady!"], function (dom, construct, win, registry, parser, array, event, json, util, template)
+ "dojo/domReady!"], function (dom, construct, win, registry, parser, lang, array, event, json, util, template)
{
+ var hideDialog = function ()
+ {
+ registry.byId("addExchange").hide();
+ };
- var addExchange = {};
+ var addExchange = {
+ _init: function ()
+ {
+ var node = construct.create("div", {innerHTML: template});
+ parser.parse(node)
+ .then(lang.hitch(this, function (instances)
+ {
+ this._postParse();
+ }));
+ },
+ _postParse: function ()
+ {
+ this.alternateBinding = registry.byId("formAddExchange.alternateBinding");
+ this.form = registry.byId("formAddExchange");
+ this.exchangeName = registry.byId("formAddExchange.name");
+ this.exchangeDurable = registry.byId("formAddExchange.durable");
+ this.exchangeName.set("regExpGen", util.nameOrContextVarRegexp);
+ this.exchangeType = registry.byId("formAddExchange.type");
+ this.context = registry.byId("formAddExchange.context");
- var node = construct.create("div", null, win.body(), "last");
+ registry.byId("formAddExchange.cancelButton")
+ .on("click", function (e)
+ {
+ event.stop(e);
+ hideDialog();
+ });
- var theForm;
- node.innerHTML = template;
- addExchange.dialogNode = dom.byId("addExchange");
- parser.instantiate([addExchange.dialogNode]);
+ registry.byId("formAddExchange.saveButton")
+ .on("click", function (e)
+ {
+ addExchange._submit(e);
+ });
- theForm = registry.byId("formAddExchange");
- array.forEach(theForm.getDescendants(), function (widget)
- {
- if (widget.name === "type")
- {
- widget.on("change", function (isChecked)
+ array.forEach(this.form.getDescendants(), function (widget)
{
-
- var obj = registry.byId(widget.id + ":fields");
- if (obj)
+ if (widget.name === "type")
{
- if (isChecked)
- {
- obj.domNode.style.display = "block";
- obj.resize();
- }
- else
+ widget.on("change", function (isChecked)
{
- obj.domNode.style.display = "none";
- obj.resize();
- }
- }
- })
- }
- });
+ var obj = registry.byId(widget.id + ":fields");
+ if (obj)
+ {
+ if (isChecked)
+ {
+ obj.domNode.style.display = "block";
+ obj.resize();
+ }
+ else
+ {
+ obj.domNode.style.display = "none";
+ obj.resize();
+ }
+ }
+ })
+ }
- theForm.on("submit", function (e)
- {
+ });
+ },
- event.stop(e);
- if (theForm.validate())
+ show: function (management, modelObj, effectiveData)
{
- var newExchange = util.getFormWidgetValues(theForm, null);
- var that = this;
- addExchange.management.create("exchange", addExchange.modelObj, newExchange)
- .then(function (x)
+ this.management = management;
+ this.modelObj = modelObj;
+
+ this.alternateBindingLoadPromise =
+ this.alternateBinding.loadData(management, effectiveData ? modelObj.parent : modelObj);
+ this.form.reset();
+
+ if (effectiveData)
+ {
+ var afterLoad = lang.hitch(this, function (data)
{
- registry.byId("addExchange")
- .hide();
+ var actualData = data.actual;
+ var effectiveData = data.effective;
+ this.initialData = actualData;
+ this.effectiveData = effectiveData;
+ this.exchangeType.set("value", actualData.type);
+ this.exchangeType.set("disabled", true);
+ this.exchangeName.set("disabled", true);
+ this.exchangeDurable.set("disabled", true);
+ this.exchangeName.set("value", actualData.name);
+ this.context.setData(actualData.context, effectiveData.context, data.inheritedActual.context);
+ this._show();
});
- return false;
+ util.loadData(management, modelObj, afterLoad, {depth: 1});
+ }
+ else
+ {
+ this.exchangeType.set("disabled", false);
+ this.exchangeName.set("disabled", false);
+ this.exchangeDurable.set("disabled", false);
+ this.initialData = {};
+ this.effectiveData = {};
+ util.loadEffectiveAndInheritedActualData(management, modelObj, lang.hitch(this, function (data)
+ {
+ this.context.setData(data.actual.context, data.effective.context, data.inheritedActual.context);
+ this._show();
+ }), {depth: 1});
+ }
+ },
- }
- else
+ _show: function ()
{
- alert('Form contains invalid data. Please correct first');
- return false;
- }
+ this.alternateBindingLoadPromise.then(lang.hitch(this, function ()
+ {
+ var alternate = this.initialData.alternateBinding;
+ if (alternate && alternate.destination)
+ {
+ this.alternateBinding.set("value", alternate.destination);
+ }
+ }));
- });
+ util.applyToWidgets(this.form.domNode,
+ "Exchange",
+ this.initialData.type || "direct",
+ this.initialData,
+ this.management.metadata);
- addExchange.show = function (management, modelObj)
- {
- addExchange.management = management
- addExchange.modelObj = modelObj;
- registry.byId("formAddExchange")
- .reset();
- registry.byId("addExchange")
- .show();
- };
+ registry.byId("addExchange").show();
+ },
+
+ _submit : function (e)
+ {
+ event.stop(e);
+ if (this.form.validate())
+ {
+ var exchangeData = util.getFormWidgetValues(this.form, this.initialData);
+ var context = this.context.get("value");
+ if (context)
+ {
+ exchangeData["context"] = context;
+ }
+ exchangeData.alternateBinding = this.alternateBinding.valueAsJson();
+ if (this.initialData && this.initialData.id)
+ {
+ this.management.update(this.modelObj, exchangeData) .then(hideDialog);
+ }
+ else
+ {
+ this.management.create("exchange", this.modelObj, exchangeData) .then(hideDialog);
+ }
+ }
+ else
+ {
+ alert('Form contains invalid data. Please correct first');
+ }
+ }
+ };
+ addExchange._init();
return addExchange;
});
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
index 0cf5771..50ef327 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
@@ -23,155 +23,219 @@ define(["dojo/dom",
"dojo/_base/window",
"dijit/registry",
"dojo/parser",
+ "dojo/_base/lang",
"dojo/_base/array",
"dojo/_base/event",
'dojo/_base/json',
"dojo/query",
'qpid/common/util',
"dojo/text!addQueue.html",
+ "qpid/common/AlternateBinding",
"qpid/common/ContextVariablesEditor",
- "dijit/form/NumberSpinner", // required by the form
- /* dojox/ validate resources */
"dojox/validate/us",
"dojox/validate/web",
- /* basic dijit classes */
"dijit/Dialog",
"dijit/form/CheckBox",
- "dijit/form/Textarea",
"dijit/form/FilteringSelect",
- "dijit/form/TextBox",
"dijit/form/ValidationTextBox",
- "dijit/form/DateTextBox",
- "dijit/form/TimeTextBox",
"dijit/form/Button",
- "dijit/form/RadioButton",
"dijit/form/Form",
- "dijit/form/DateTextBox",
- /* basic dojox classes */
- "dojox/form/BusyButton",
- "dojox/form/CheckedMultiSelect",
- "dojo/domReady!"], function (dom, construct, win, registry, parser, array, event, json, query, util, template)
-{
-
- var addQueue = {};
-
- var node = construct.create("div", null, win.body(), "last");
-
- var requiredFields = {sorted: "sortKey"};
-
- var numericFieldNames = ["maximumMessageTtl",
- "minimumMessageTtl",
- "alertThresholdQueueDepthMessages",
- "alertThresholdQueueDepthBytes",
- "alertThresholdMessageAge",
- "alertThresholdMessageSize",
- "alertRepeatGap",
- "maximumDeliveryAttempts"];
-
- var theForm;
- node.innerHTML = template;
- addQueue.dialogNode = dom.byId("addQueue");
- parser.instantiate([addQueue.dialogNode]);
-
- // for children which have name type, add a function to make all the associated atrributes
- // visible / invisible as the select is changed
- theForm = registry.byId("formAddQueue");
- var typeSelector = registry.byId("formAddQueue.type");
- typeSelector.on("change", function (value)
+ "dojo/domReady!"],
+ function (dom, construct, win, registry, parser, lang, array, event, json, query, util, template)
{
- query(".typeSpecificDiv")
- .forEach(function (node, index, arr)
+ var hideDialog = function ()
+ {
+ registry.byId("addQueue")
+ .hide();
+ };
+
+ var requiredFields = {sorted: "sortKey"};
+
+ var numericFieldNames = ["maximumMessageTtl",
+ "minimumMessageTtl",
+ "alertThresholdQueueDepthMessages",
+ "alertThresholdQueueDepthBytes",
+ "alertThresholdMessageAge",
+ "alertThresholdMessageSize",
+ "alertRepeatGap",
+ "maximumDeliveryAttempts"];
+
+ var addQueue = {
+ _init: function ()
{
- if (node.id === "formAddQueueType:" + value)
- {
- node.style.display = "block";
- if (addQueue.management)
+ var node = construct.create("div", {innerHTML: template});
+ parser.parse(node)
+ .then(lang.hitch(this, function (instances)
{
- util.applyMetadataToWidgets(node, "Queue", value, addQueue.management.metadata);
- }
- }
- else
+ this._postParse();
+ }));
+ },
+ _postParse: function ()
+ {
+ this.alternateBinding = registry.byId("formAddQueue.alternateBinding");
+ this.form = registry.byId("formAddQueue");
+
+ for (var i = 0; i < numericFieldNames.length; i++)
{
- node.style.display = "none";
+ registry.byId("formAddQueue." + numericFieldNames[i])
+ .set("regExpGen", util.numericOrContextVarRegexp);
}
- });
- for (var requiredField in requiredFields)
- {
- dijit.byId('formAddQueue.' + requiredFields[requiredField]).required = (requiredField == value);
- }
- });
- theForm.on("submit", function (e)
- {
+ registry.byId("formAddQueue.maximumQueueDepthBytes")
+ .set("regExpGen", util.signedOrContextVarRegexp);
+ registry.byId("formAddQueue.maximumQueueDepthMessages")
+ .set("regExpGen", util.signedOrContextVarRegexp);
- event.stop(e);
- if (theForm.validate())
- {
+ this.queueName = registry.byId("formAddQueue.name");
+ this.queueName.set("regExpGen", util.nameOrContextVarRegexp);
+ this.queueDurable = registry.byId("formAddQueue.durable");
+ this.queueType = registry.byId("formAddQueue.type");
+ this.context = registry.byId("formAddQueue.context");
+ this.overflowPolicyWidget = registry.byId("formAddQueue.overflowPolicy");
+ this.editNodeBanner = dom.byId("addQueue.editNoteBanner");
- var newQueue = util.getFormWidgetValues(theForm);
- var context = addQueue.context.get("value");
- if (context)
- {
- newQueue["context"] = context;
- }
- addQueue.management.create("queue", addQueue.modelObj, newQueue)
- .then(function (x)
+ registry.byId("formAddQueue.cancelButton")
+ .on("click", function (e)
+ {
+ event.stop(e);
+ hideDialog();
+ });
+
+ registry.byId("formAddQueue.saveButton")
+ .on("click", lang.hitch(this, function (e)
+ {
+ this._submit(e);
+ }));
+
+ registry.byId("formAddQueue.type")
+ .on("change", function (value)
+ {
+ query(".typeSpecificDiv")
+ .forEach(function (node, index, arr)
+ {
+ if (node.id === "formAddQueueType:" + value)
+ {
+ node.style.display = "block";
+ if (addQueue.management)
+ {
+ util.applyMetadataToWidgets(node, "Queue", value, addQueue.management.metadata);
+ }
+ }
+ else
+ {
+ node.style.display = "none";
+ }
+ });
+ for (var requiredField in requiredFields)
+ {
+ dijit.byId('formAddQueue.' + requiredFields[requiredField]).required =
+ (requiredField == value);
+ }
+ });
+ },
+
+ _submit: function (e)
+ {
+ event.stop(e);
+ if (this.form.validate())
{
- registry.byId("addQueue")
- .hide();
- });
- return false;
+ var queueData = util.getFormWidgetValues(this.form, this.initialData);
+ var context = this.context.get("value");
+ if (context)
+ {
+ queueData["context"] = context;
+ }
+ queueData.alternateBinding = this.alternateBinding.valueAsJson();
- }
- else
- {
- alert('Form contains invalid data. Please correct first');
- return false;
- }
+ if (this.initialData && this.initialData.id)
+ {
+ this.management.update(this.modelObj, queueData)
+ .then(hideDialog);
+ }
+ else
+ {
+ this.management.create("queue", this.modelObj, queueData)
+ .then(hideDialog);
+ }
+ return false;
+ }
+ else
+ {
+ alert('Form contains invalid data. Please correct first');
+ return false;
+ }
+ },
- });
+ show: function (management, modelObj, effectiveData)
+ {
+ this.management = management;
+ this.modelObj = modelObj;
- addQueue.show = function (management, modelObj)
- {
- addQueue.management = management;
- addQueue.modelObj = modelObj;
-
- var form = registry.byId("formAddQueue");
- form.reset();
- registry.byId("addQueue")
- .show();
- util.applyMetadataToWidgets(form.domNode, "Queue", "standard", addQueue.management.metadata);
-
- var overflowPolicyWidget = registry.byId("formAddQueue.overflowPolicy");
- var validValues = addQueue.management.metadata.getMetaData("Queue", "standard").attributes.overflowPolicy.validValues;
- var validValueStore = util.makeTypeStore(validValues);
- overflowPolicyWidget.set("store", validValueStore);
-
- // Add regexp to the numeric fields
- for (var i = 0; i < numericFieldNames.length; i++)
- {
- registry.byId("formAddQueue." + numericFieldNames[i])
- .set("regExpGen", util.numericOrContextVarRegexp);
- }
+ this.alternateBindingLoadPromise =
+ this.alternateBinding.loadData(management, effectiveData ? modelObj.parent : modelObj);
+ this.form.reset();
- registry.byId("formAddQueue.maximumQueueDepthBytes").set("regExpGen", util.signedOrContextVarRegexp);
- registry.byId("formAddQueue.maximumQueueDepthMessages").set("regExpGen", util.signedOrContextVarRegexp);
+ if (effectiveData)
+ {
+ var afterLoad = lang.hitch(this, function (data)
+ {
+ var actualData = data.actual;
+ var effectiveData = data.effective;
+ this.initialData = actualData;
+ this.effectiveData = effectiveData;
+ this.queueType.set("value", actualData.type);
+ this.queueType.set("disabled", true);
+ this.queueName.set("disabled", true);
+ this.queueDurable.set("disabled", true);
+ this.queueName.set("value", actualData.name);
+ this.context.setData(actualData.context, effectiveData.context, data.inheritedActual.context);
+ this.editNodeBanner.style.display = "block";
+ this._show();
+ });
+ util.loadData(management, modelObj, afterLoad, {depth: 1});
+ }
+ else
+ {
+ this.editNodeBanner.style.display = "none";
+ this.queueType.set("disabled", false);
+ this.queueName.set("disabled", false);
+ this.queueDurable.set("disabled", false);
+ this.initialData = {"type": "standard"};
+ this.effectiveData = {};
+ util.loadEffectiveAndInheritedActualData(management, modelObj, lang.hitch(this, function (data)
+ {
+ this.context.setData(data.actual.context, data.effective.context, data.inheritedActual.context);
+ this._show();
+ }), {depth: 1});
+ }
+ },
- if (!this.context)
- {
- this.context = new qpid.common.ContextVariablesEditor({
- name: 'context',
- title: 'Context variables'
- });
- this.context.placeAt(dom.byId("formAddQueue.context"));
- }
-
- util.loadEffectiveAndInheritedActualData(management, modelObj, function (data)
- {
- addQueue.context.setData(data.actual.context, data.effective.context, data.inheritedActual.context);
- });
- };
+ _show: function ()
+ {
+ this.alternateBindingLoadPromise.then(lang.hitch(this, function ()
+ {
+ var alternate = this.initialData.alternateBinding;
+ if (alternate && alternate.destination)
+ {
+ this.alternateBinding.set("value", alternate.destination);
+ }
+ }));
+
+ util.applyToWidgets(this.form.domNode,
+ "Queue",
+ this.initialData.type,
+ this.initialData,
+ this.management.metadata);
+
+ var validValues = this.management.metadata.getMetaData("Queue",
+ this.initialData.type).attributes.overflowPolicy.validValues;
+ var validValueStore = util.makeTypeStore(validValues);
+ this.overflowPolicyWidget.set("store", validValueStore);
+ registry.byId("addQueue").show();
+ }
+ };
- return addQueue;
-});
+ addQueue._init();
+ return addQueue;
+ });
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
deleted file mode 100644
index ed76d33..0000000
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-define(["dojox/html/entities",
- "dojo/_base/array",
- "dojo/_base/event",
- "dojo/_base/lang",
- "dojo/_base/window",
- "dojo/dom",
- "dojo/dom-construct",
- "dijit/registry",
- "dojo/parser",
- 'dojo/json',
- "dojo/query",
- "qpid/common/util",
- "dojo/text!editQueue.html",
- "qpid/common/ContextVariablesEditor",
- "dijit/Dialog",
- "dijit/form/CheckBox",
- "dijit/form/FilteringSelect",
- "dijit/form/ValidationTextBox",
- "dijit/form/Button",
- "dijit/form/Form",
- "dojox/validate/us",
- "dojox/validate/web",
- "dojo/domReady!"],
- function (entities, array, event, lang, win, dom, domConstruct, registry, parser, json, query, util, template)
- {
-
- var numericFieldNames = ["maximumMessageTtl",
- "minimumMessageTtl",
- "alertThresholdQueueDepthMessages",
- "alertThresholdQueueDepthBytes",
- "alertThresholdMessageAge",
- "alertThresholdMessageSize",
- "alertRepeatGap",
- "maximumDeliveryAttempts"];
-
- var queueEditor = {
- init: function ()
- {
- var that = this;
- this.containerNode = domConstruct.create("div", {innerHTML: template});
- parser.parse(this.containerNode)
- .then(function (instances)
- {
- that._postParse();
- });
- },
- _postParse: function ()
- {
- var that = this;
- this.allFieldsContainer = dom.byId("formEditQueue.allFields");
- this.dialog = registry.byId("editQueue");
- this.saveButton = registry.byId("formEditQueue.saveButton");
- this.cancelButton = registry.byId("formEditQueue.cancelButton");
- this.cancelButton.on("click", function (e)
- {
- that._cancel(e);
- });
- this.saveButton.on("click", function (e)
- {
- that._save(e);
- });
- this.form = registry.byId("formEditQueue");
- this.form.on("submit", function ()
- {
- return false;
- });
- this.typeSelector = registry.byId("formEditQueue.type");
- },
- show: function (management, modelObj)
- {
- this.management = management;
- this.modelObj = modelObj;
- if (!this.context)
- {
- this.context = new qpid.common.ContextVariablesEditor({
- name: 'context',
- title: 'Context variables'
- });
- this.context.placeAt(dom.byId("formEditQueue.context"));
- }
- this.dialog.set("title", "Edit Queue - " + entities.encode(String(modelObj.name)));
- util.loadData(management, modelObj, lang.hitch(this, this._show));
- },
- destroy: function ()
- {
- if (this.dialog)
- {
- this.dialog.destroyRecursive();
- this.dialog = null;
- }
-
- if (this.containerNode)
- {
- domConstruct.destroy(this.containerNode);
- this.containerNode = null;
- }
- },
- _cancel: function (e)
- {
- this.dialog.hide();
- },
- _save: function (e)
- {
- event.stop(e);
- if (this.form.validate())
- {
- var data = util.getFormWidgetValues(this.form, this.initialData);
- var context = this.context.get("value");
- if (context && !util.equals(context, this.initialData.context))
- {
- data["context"] = context;
- }
- var that = this;
- this.management.update(that.modelObj, data)
- .then(function (x)
- {
- that.dialog.hide()
- });
- }
- else
- {
- alert('Form contains invalid data. Please correct first');
- }
- },
- _show: function (data)
- {
- this.initialData = data.actual;
- this.form.reset();
-
- var that = this;
-
- var overflowPolicyWidget = registry.byId("formEditQueue.overflowPolicy");
- var validValues = this.management.metadata.getMetaData("Queue", "standard").attributes.overflowPolicy.validValues;
- var validValueStore = util.makeTypeStore(validValues);
- overflowPolicyWidget.set("store", validValueStore);
-
- util.applyToWidgets(that.allFieldsContainer,
- "Queue",
- data.actual.type,
- data.actual,
- this.management.metadata,
- data.effective
- );
-
- this.context.setData(data.actual.context, data.effective.context, data.inheritedActual.context);
-
- // Add regexp to the numeric fields
- for (var i = 0; i < numericFieldNames.length; i++)
- {
- registry.byId("formEditQueue." + numericFieldNames[i])
- .set("regExpGen", util.numericOrContextVarRegexp);
- }
-
- registry.byId("formEditQueue.maximumQueueDepthBytes").set("regExpGen", util.signedOrContextVarRegexp);
- registry.byId("formEditQueue.maximumQueueDepthMessages").set("regExpGen", util.signedOrContextVarRegexp);
-
- var queueType = this.typeSelector.get("value");
- query(".typeSpecificDiv")
- .forEach(function (node, index, arr)
- {
- if (node.id === "formEditQueueType:" + queueType)
- {
- node.style.display = "block";
- util.applyMetadataToWidgets(node, "Queue", queueType, that.management.metadata);
- }
- else
- {
- node.style.display = "none";
- }
- });
- this.dialog.startup();
- this.dialog.show();
- if (!this.resizeEventRegistered)
- {
- this.resizeEventRegistered = true;
- util.resizeContentAreaAndRepositionDialog(dom.byId("formEditQueue.contentPane"), this.dialog);
- }
- }
- };
-
- queueEditor.init();
-
- return queueEditor;
- });
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/showExchange.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/showExchange.html b/broker-plugins/management-http/src/main/java/resources/showExchange.html
index b67ea2f..0f5aa31 100644
--- a/broker-plugins/management-http/src/main/java/resources/showExchange.html
+++ b/broker-plugins/management-http/src/main/java/resources/showExchange.html
@@ -35,6 +35,18 @@
<div class="formLabel-labelCell">State:</div>
<div class="state formValue-valueCell"></div>
</div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Durable:</div>
+ <div class="durable formValue-valueCell""></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Lifespan:</div>
+ <div class="lifetimePolicy formValue-valueCell""></div>
+ </div>
+ <div class="clear">
+ <div class="formLabel-labelCell">Alternate Binding:</div>
+ <div class="alternateBinding formValue-valueCell"></div>
+ </div>
</div>
<div class="alignRight">
<div class="clear">
@@ -57,27 +69,17 @@
</div>
</div>
</div>
- <div class="clear">
- <div class="formLabel-labelCell">Durable:</div>
- <div class="durable"></div>
- </div>
- <div class="clear">
- <div class="formLabel-labelCell">Lifespan:</div>
- <div class="lifetimePolicy"></div>
- </div>
<div class="clear"></div>
</div>
+ <div class="dijitDialogPaneActionBar">
+ <button data-dojo-type="dijit.form.Button" class="editExchangeButton" type="button">Edit Exchange</button>
+ <button data-dojo-type="dijit.form.Button" class="deleteExchangeButton" type="button">Delete Exchange</button>
+ </div>
<br/>
-
<div data-dojo-type="dijit.TitlePane" data-dojo-props="title: 'Bindings'">
<div class="bindings"></div>
<button data-dojo-type="dijit.form.Button" class="addBindingButton">Add Binding</button>
<button data-dojo-type="dijit.form.Button" class="deleteBindingButton">Delete Binding</button>
</div>
- <br/>
-
- <div class="dijitDialogPaneActionBar">
- <button data-dojo-type="dijit.form.Button" class="deleteExchangeButton" type="button">Delete Exchange</button>
- </div>
</div>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-http/src/main/java/resources/showQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/showQueue.html b/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 9ba180a..72f8ab1 100644
--- a/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -124,8 +124,8 @@
<div class="owner"></div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Alternate Exchange:</div>
- <div class="alternateExchange"></div>
+ <div class="formLabel-labelCell">Alternate Binding:</div>
+ <div class="alternateBinding"></div>
</div>
<div class="clear">
<div class="formLabel-labelCell">Maximum Delivery Attempts:</div>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index ff696bc..0689eaa 100644
--- a/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -90,8 +90,6 @@ public class Asserts
virtualHost.get(VirtualHost.DURABLE));
assertEquals("Unexpected value of attribute " + VirtualHost.LIFETIME_POLICY, LifetimePolicy.PERMANENT.name(),
virtualHost.get(VirtualHost.LIFETIME_POLICY));
- assertEquals("Unexpected value of attribute " + QueueManagingVirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED, Boolean.FALSE,
- virtualHost.get(QueueManagingVirtualHost.QUEUE_DEAD_LETTER_QUEUE_ENABLED));
@SuppressWarnings("unchecked")
Map<String, Object> statistics = (Map<String, Object>) virtualHost.get(STATISTICS_ATTRIBUTE);
@@ -119,7 +117,7 @@ public class Asserts
Queue.LAST_UPDATED_TIME,
Queue.TYPE,
Queue.DESCRIPTION,
- Queue.ALTERNATE_EXCHANGE,
+ Queue.ALTERNATE_BINDING,
Queue.OWNER,
Queue.NO_LOCAL,
LastValueQueue.LVQ_KEY,
@@ -334,7 +332,7 @@ public class Asserts
{
assertNotNull("Exchange " + exchangeName + " is not found!", exchangeData);
assertAttributesPresent(exchangeData, BrokerModel.getInstance().getTypeRegistry().getAttributeNames(Exchange.class),
- Exchange.ALTERNATE_EXCHANGE,
+ Exchange.ALTERNATE_BINDING,
ConfiguredObject.CREATED_BY,
ConfiguredObject.CREATED_TIME,
ConfiguredObject.LAST_UPDATED_BY,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
index 901526c..57726e1 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
@@ -298,7 +298,7 @@ public class AmqpManagementFacade
return new HashMap<>(bodyMap);
}
}
- throw new IllegalArgumentException("Cannot parse the results from a management read");
+ throw new IllegalArgumentException("Management read failed : " + response.getStringProperty("statusCode") + " - " + response.getStringProperty("statusDescription"));
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] qpid-broker-j git commit: QPID-7606: Remodel alternateExchange
as alternateBinding
Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
index 3b209c0..73a4597 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/DirectExchangeTest.java
@@ -19,16 +19,18 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.BrokerTestHelper;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -124,19 +126,19 @@ public class DirectExchangeTest extends QpidTestCase
Map<String, Object> attributes = new HashMap<>();
attributes.put(Queue.NAME, getTestName());
attributes.put(Queue.DURABLE, false);
- attributes.put(Queue.ALTERNATE_EXCHANGE, _exchange.getName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
Queue queue = (Queue) _vhost.createChild(Queue.class, attributes);
queue.open();
- assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateExchange());
+ assertEquals("Unexpected alternate exchange on queue", _exchange, queue.getAlternateBindingDestination());
try
{
_exchange.delete();
- fail("Exchange deletion should fail with ExchangeIsAlternateException");
+ fail("Exchange deletion should fail with MessageDestinationIsAlternateException");
}
- catch(ExchangeIsAlternateException e)
+ catch(MessageDestinationIsAlternateException e)
{
// pass
}
@@ -145,4 +147,78 @@ public class DirectExchangeTest extends QpidTestCase
assertEquals("Unexpected desired exchange state", State.ACTIVE, _exchange.getDesiredState());
}
+ public void testAlternateBindingValidationRejectsNonExistingDestination()
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Exchange.NAME, getTestName());
+ attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, "nonExisting"));
+
+ try
+ {
+ _vhost.createChild(Exchange.class, attributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBindingValidationRejectsSelf()
+ {
+ Map<String, String> alternateBinding = Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName());
+ Map<String, Object> newAttributes = Collections.singletonMap(Exchange.ALTERNATE_BINDING, alternateBinding);
+ try
+ {
+ _exchange.setAttributes(newAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testDurableExchangeRejectsNonDurableAlternateBinding()
+ {
+ Map<String, Object> dlqAttributes = new HashMap<>();
+ String dlqName = getTestName() + "_DLQ";
+ dlqAttributes.put(Queue.NAME, dlqName);
+ dlqAttributes.put(Queue.DURABLE, false);
+ _vhost.createChild(Queue.class, dlqAttributes);
+
+ Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put(Exchange.NAME, getTestName());
+ exchangeAttributes.put(Exchange.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, dlqName));
+ exchangeAttributes.put(Exchange.DURABLE, true);
+ exchangeAttributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+
+ try
+ {
+ _vhost.createChild(Exchange.class, exchangeAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBinding()
+ {
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Exchange.NAME, getTestName());
+ attributes.put(Exchange.TYPE, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, _exchange.getName()));
+ attributes.put(Exchange.DURABLE, false);
+
+ Exchange newExchange = _vhost.createChild(Exchange.class, attributes);
+
+ assertEquals("Unexpected alternate binding",
+ _exchange.getName(),
+ newExchange.getAlternateBinding().getDestination());
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
index f8e704d..baf5d0f 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -37,7 +38,7 @@ public class BindingLogSubjectTest extends AbstractTestLogSubject
private Queue<?> _queue;
private String _routingKey;
private Exchange<?> _exchange;
- private VirtualHost _testVhost;
+ private QueueManagingVirtualHost _testVhost;
@Override
public void setUp() throws Exception
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
index 3c37728..4705053 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/QueueLogSubjectTest.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.logging.subjects;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -34,7 +35,7 @@ public class QueueLogSubjectTest extends AbstractTestLogSubject
{
private Queue<?> _queue;
- private VirtualHost _testVhost;
+ private QueueManagingVirtualHost _testVhost;
@Override
public void setUp() throws Exception
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 003fc06..1d1578c 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.TestConsumerTarget;
@@ -59,6 +60,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
@@ -70,6 +72,7 @@ import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -965,6 +968,92 @@ abstract class AbstractQueueTestBase extends QpidTestCase
assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
}
+ public void testAlternateBindingValidationRejectsNonExistingDestination()
+ {
+ Map<String, Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, "nonExisting"));
+
+ try
+ {
+ _virtualHost.createChild(Queue.class, attributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBindingValidationRejectsSelf()
+ {
+ Map<String, String> alternateBinding = Collections.singletonMap(AlternateBinding.DESTINATION, _qname);
+ Map<String, Object> newAttributes = Collections.singletonMap(Queue.ALTERNATE_BINDING, alternateBinding);
+ try
+ {
+ _queue.setAttributes(newAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testDurableQueueRejectsNonDurableAlternateBinding()
+ {
+ Map<String, Object> dlqAttributes = new HashMap<>(_arguments);
+ String dlqName = getTestName() + "_DLQ";
+ dlqAttributes.put(Queue.NAME, dlqName);
+ dlqAttributes.put(Queue.DURABLE, false);
+ _virtualHost.createChild(Queue.class, dlqAttributes);
+
+ Map<String, Object> queueAttributes = new HashMap<>(_arguments);
+ queueAttributes.put(Queue.NAME, getTestName());
+ queueAttributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, dlqName));
+ queueAttributes.put(Queue.DURABLE, true);
+
+ try
+ {
+ _virtualHost.createChild(Queue.class, queueAttributes);
+ fail("Expected exception is not thrown");
+ }
+ catch (IllegalConfigurationException e)
+ {
+ // pass
+ }
+ }
+
+ public void testAlternateBinding()
+ {
+ Map<String, Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _qname));
+
+ Queue newQueue = _virtualHost.createChild(Queue.class, attributes);
+
+ assertEquals("Unexpected alternate binding", _qname, newQueue.getAlternateBinding().getDestination());
+ }
+
+ public void testDeleteOfQueueSetAsAlternate()
+ {
+ Map<String, Object> attributes = new HashMap<>(_arguments);
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, _qname));
+
+ Queue newQueue = _virtualHost.createChild(Queue.class, attributes);
+ assertEquals("Unexpected alternate binding", _qname, newQueue.getAlternateBinding().getDestination());
+ try
+ {
+ _queue.delete();
+ fail("Expected exception is not thrown");
+ }
+ catch (MessageDestinationIsAlternateException e)
+ {
+ //pass
+ }
+ }
+
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index fae8ccb..53185f4 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -40,9 +40,9 @@ import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
@@ -57,28 +57,23 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
-import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.server.util.FileUtils;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.test.utils.QpidTestCase;
public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTestCase
{
private static final String EXCHANGE = org.apache.qpid.server.model.Exchange.class.getSimpleName();
- private static final String BINDING = org.apache.qpid.server.model.Binding.class.getSimpleName();
private static final String QUEUE = Queue.class.getSimpleName();
-
private static final UUID ANY_UUID = UUID.randomUUID();
private static final Map ANY_MAP = new HashMap();
- public static final String STANDARD = "standard";
-
+ private static final String STANDARD = "standard";
private String _storePath;
private String _storeName;
private ConfiguredObjectRecordHandler _handler;
- private static final String ROUTING_KEY = "routingKey";
- private static final String QUEUE_NAME = "queueName";
private Map<String,Object> _bindingArgs;
private UUID _queueId;
private UUID _exchangeId;
@@ -89,6 +84,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private ConfiguredObjectRecord _rootRecord;
+ @Override
public void setUp() throws Exception
{
super.setUp();
@@ -102,7 +98,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
_handler = mock(ConfiguredObjectRecordHandler.class);
- _bindingArgs = new HashMap<String, Object>();
+ _bindingArgs = new HashMap<>();
String argKey = AMQPFilterTypes.JMS_SELECTOR.toString();
String argValue = "some selector expression";
_bindingArgs.put(argKey, argValue);
@@ -126,6 +122,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
protected abstract VirtualHostNode createVirtualHostNode(String storeLocation, ConfiguredObjectFactory factory);
+ @Override
public void tearDown() throws Exception
{
try
@@ -162,7 +159,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private Map<String,Object> map(Object... vals)
{
- Map<String,Object> map = new HashMap<String, Object>();
+ Map<String,Object> map = new HashMap<>();
boolean isValue = false;
String key = null;
for(Object obj : vals)
@@ -191,15 +188,6 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
-
- private ConfiguredObjectRecord matchesRecord(UUID id,
- String type,
- Map<String, Object> attributes,
- final Map<String, UUID> parents)
- {
- return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents));
- }
-
private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP));
@@ -227,7 +215,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
{
ConfiguredObjectRecord binding = (ConfiguredObjectRecord) argument;
- Map<String,Object> arg = new HashMap<String, Object>(binding.getAttributes());
+ Map<String,Object> arg = new HashMap<>(binding.getAttributes());
arg.remove("createdBy");
arg.remove("createdTime");
arg.remove("lastUpdatedTime");
@@ -258,7 +246,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
}
}
- public void testCreateQueueAMQQueue() throws Exception
+ public void testCreateQueue() throws Exception
{
Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, null);
_configStore.create(queue.asObjectRecord());
@@ -266,70 +254,17 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
reopenStore();
_configStore.openConfigurationStore(_handler);
- Map<String, Object> queueAttributes = new HashMap<String, Object>();
- queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- queueAttributes.put(Queue.TYPE, STANDARD);
- verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
- }
-
- public void testCreateQueueAMQQueueFieldTable() throws Exception
- {
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
- attributes.put(Queue.TYPE, STANDARD);
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
-
- _configStore.create(queue.asObjectRecord());
-
- reopenStore();
- _configStore.openConfigurationStore(_handler);
-
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
-
- queueAttributes.put(Queue.NAME, getName());
- queueAttributes.put(Queue.OWNER, getName()+"Owner");
- queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- queueAttributes.putAll(attributes);
-
- verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
- }
-
- public void testCreateQueueAMQQueueWithAlternateExchange() throws Exception
- {
- Exchange<?> alternateExchange = createTestAlternateExchange();
-
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, alternateExchange, null);
- _configStore.create(queue.asObjectRecord());
-
- reopenStore();
- _configStore.openConfigurationStore(_handler);
-
- Map<String, Object> queueAttributes = new HashMap<String, Object>();
+ Map<String, Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.put(Queue.OWNER, getName()+"Owner");
queueAttributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONTAINER.name());
- queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
queueAttributes.put(Queue.TYPE, STANDARD);
verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
}
- private Exchange<?> createTestAlternateExchange()
- {
- UUID exchUuid = UUID.randomUUID();
- Exchange<?> alternateExchange = mock(Exchange.class);
- when(alternateExchange.getId()).thenReturn(exchUuid);
- return alternateExchange;
- }
-
- public void testUpdateQueueExclusivity() throws Exception
+ public void testUpdateQueue() throws Exception
{
- // create queue
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
+ Map<String, Object> attributes = new HashMap<>();
attributes.put(Queue.TYPE, STANDARD);
Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
@@ -343,7 +278,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
reopenStore();
_configStore.openConfigurationStore(_handler);
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
+ Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getName());
queueAttributes.putAll(attributes);
@@ -352,43 +287,12 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
}
- public void testUpdateQueueAlternateExchange() throws Exception
- {
- // create queue
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
- _configStore.create(queue.asObjectRecord());
-
- // update the queue to have exclusive=false
- Exchange<?> alternateExchange = createTestAlternateExchange();
- queue = createTestQueue(getName(), getName() + "Owner", false, alternateExchange, attributes);
-
- _configStore.update(false, queue.asObjectRecord());
-
- reopenStore();
- _configStore.openConfigurationStore(_handler);
-
- Map<String,Object> queueAttributes = new HashMap<String, Object>();
-
- queueAttributes.put(Queue.NAME, getName());
- queueAttributes.putAll(attributes);
- queueAttributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange.getId().toString());
- queueAttributes.put(Queue.TYPE, STANDARD);
- verify(_handler).handle(matchesRecord(_queueId, QUEUE, queueAttributes));
- }
public void testRemoveQueue() throws Exception
{
- // create queue
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, Boolean.TRUE);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 10);
- Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, attributes);
+ Queue<?> queue = createTestQueue(getName(), getName() + "Owner", true, Collections.emptyMap());
_configStore.create(queue.asObjectRecord());
- // remove queue
_configStore.remove(queue.asObjectRecord());
reopenStore();
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
@@ -399,36 +303,22 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
boolean exclusive,
final Map<String, Object> arguments) throws StoreException
{
- return createTestQueue(queueName, queueOwner, exclusive, null, arguments);
- }
-
- private Queue<?> createTestQueue(String queueName,
- String queueOwner,
- boolean exclusive,
- Exchange<?> alternateExchange,
- final Map<String, Object> arguments) throws StoreException
- {
Queue queue = BrokerTestHelper.mockWithSystemPrincipal(Queue.class, mock(Principal.class));
when(queue.getName()).thenReturn(queueName);
when(queue.isExclusive()).thenReturn(exclusive);
when(queue.getId()).thenReturn(_queueId);
when(queue.getType()).thenReturn(STANDARD);
- when(queue.getAlternateExchange()).thenReturn(alternateExchange);
- when(queue.getCategoryClass()).thenReturn((Class)Queue.class);
+ when(queue.getCategoryClass()).thenReturn(Queue.class);
when(queue.isDurable()).thenReturn(true);
TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
when(queue.getTaskExecutor()).thenReturn(taskExecutor);
when(queue.getChildExecutor()).thenReturn(taskExecutor);
- final VirtualHost vh = mock(VirtualHost.class);
+ final QueueManagingVirtualHost vh = mock(QueueManagingVirtualHost.class);
when(queue.getVirtualHost()).thenReturn(vh);
- final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<String, Object>() : new LinkedHashMap<String, Object>(arguments);
+ final Map<String,Object> attributes = arguments == null ? new LinkedHashMap<>() : new LinkedHashMap<>(arguments);
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.TYPE, STANDARD);
- if(alternateExchange != null)
- {
- attributes.put(Queue.ALTERNATE_EXCHANGE, alternateExchange);
- }
if(exclusive)
{
when(queue.getOwner()).thenReturn(queueOwner);
@@ -466,7 +356,7 @@ public abstract class AbstractDurableConfigurationStoreTestCase extends QpidTest
private Exchange<?> createTestExchange()
{
Exchange exchange = mock(Exchange.class);
- Map<String,Object> actualAttributes = new HashMap<String, Object>();
+ Map<String,Object> actualAttributes = new HashMap<>();
actualAttributes.put("name", getName());
actualAttributes.put("type", getName() + "Type");
actualAttributes.put("lifetimePolicy", LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
index 3c574b3..f55ff86 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
@@ -49,7 +49,7 @@ public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
_upgraderAndRecoverer = new VirtualHostStoreUpgraderAndRecoverer(_virtualHostNode);
}
- public void testUpgradeForFlowControlFrom_6_1() throws Exception
+ public void testUpgradeFlowControlFrom_6_1() throws Exception
{
Map<String, Object> rootAttributes = new HashMap<>();
rootAttributes.put("modelVersion", "6.1");
@@ -83,6 +83,122 @@ public class VirtualHostStoreUpgraderAndRecovererTest extends QpidTestCase
String.valueOf(upgradedAttributes.get("overflowPolicy")));
}
+ public void testUpgradeQueueAlternateExchangeFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+ Map<String, Object> queueAttributes = new HashMap<>();
+ queueAttributes.put("name", "queue");
+ queueAttributes.put("alternateExchange", "testExchange");
+
+ ConfiguredObjectRecord queueRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Queue", queueAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ final Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put("name", "testExchange");
+ ConfiguredObjectRecord exchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", exchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, queueRecord, exchangeRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(queueRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded queue record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertTrue("Attribute 'alternateBinding' was not added", upgradedAttributes.containsKey("alternateBinding"));
+ assertEquals("Unexpected alternateBinding",
+ new HashMap<>(Collections.singletonMap("destination", "testExchange")),
+ new HashMap<>(((Map<String, String>) upgradedAttributes.get("alternateBinding"))));
+ assertFalse("Attribute 'alternateExchange' was not removed", upgradedAttributes.containsKey("alternateExchange"));
+
+ }
+
+ public void testUpgradeExchangeAlternateExchangeFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+
+ final Map<String, Object> alternateExchangeAttributes = new HashMap<>();
+ alternateExchangeAttributes.put("name", "testExchange");
+ ConfiguredObjectRecord alternateExchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", alternateExchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put("name", "exchange");
+ exchangeAttributes.put("alternateExchange", "testExchange");
+
+ ConfiguredObjectRecord exchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", exchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, exchangeRecord, alternateExchangeRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(exchangeRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded exchange record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertTrue("Attribute 'alternateBinding' was not added", upgradedAttributes.containsKey("alternateBinding"));
+ assertEquals("Unexpected alternateBinding",
+ new HashMap<>(Collections.singletonMap("destination", "testExchange")),
+ new HashMap<>(((Map<String, String>) upgradedAttributes.get("alternateBinding"))));
+ assertFalse("Attribute 'alternateExchange' was not removed", upgradedAttributes.containsKey("alternateExchange"));
+
+ }
+ public void testUpgradeExchangeAlternateExchangeSpecifiedWithUUIDFrom_6_1() throws Exception
+ {
+ Map<String, Object> rootAttributes = new HashMap<>();
+ rootAttributes.put("modelVersion", "6.1");
+ rootAttributes.put("name", "root");
+ ConfiguredObjectRecord rootRecord =
+ new ConfiguredObjectRecordImpl(UUID.randomUUID(), "VirtualHost", rootAttributes);
+
+ final Map<String, Object> alternateExchangeAttributes = new HashMap<>();
+ alternateExchangeAttributes.put("name", "testExchange");
+ UUID alternateExchangeId = UUID.randomUUID();
+ ConfiguredObjectRecord alternateExchangeRecord = new ConfiguredObjectRecordImpl(alternateExchangeId, "Exchange", alternateExchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+ Map<String, Object> exchangeAttributes = new HashMap<>();
+ exchangeAttributes.put("name", "exchange");
+ exchangeAttributes.put("alternateExchange", alternateExchangeId.toString());
+
+ ConfiguredObjectRecord exchangeRecord = new ConfiguredObjectRecordImpl(UUID.randomUUID(), "Exchange", exchangeAttributes,
+ Collections.singletonMap(rootRecord.getType(),
+ rootRecord.getId()));
+
+ List<ConfiguredObjectRecord> records = Arrays.asList(rootRecord, exchangeRecord, alternateExchangeRecord);
+ List<ConfiguredObjectRecord> upgradedRecords =
+ _upgraderAndRecoverer.upgrade(_store, records, "VirtualHost", "modelVersion");
+
+ ConfiguredObjectRecord upgradedQueueRecord = findRecordById(exchangeRecord.getId(), upgradedRecords);
+ assertNotNull("Upgraded exchange record not found ", upgradedQueueRecord);
+
+ Map<String, Object> upgradedAttributes = upgradedQueueRecord.getAttributes();
+ assertNotNull("Upgraded attributes not found", upgradedAttributes);
+
+ assertTrue("Attribute 'alternateBinding' was not added", upgradedAttributes.containsKey("alternateBinding"));
+ assertEquals("Unexpected alternateBinding",
+ new HashMap<>(Collections.singletonMap("destination", "testExchange")),
+ new HashMap<>(((Map<String, String>) upgradedAttributes.get("alternateBinding"))));
+ assertFalse("Attribute 'alternateExchange' was not removed", upgradedAttributes.containsKey("alternateExchange"));
+ }
+
private ConfiguredObjectRecord findRecordById(UUID id, List<ConfiguredObjectRecord> records)
{
for (ConfiguredObjectRecord record : records)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index 8493a03..f5fc87b 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
@@ -37,8 +36,6 @@ import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
-import org.apache.qpid.server.model.Exchange;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
@@ -110,13 +107,14 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
super.tearDown();
}
}
+
private VirtualHost<?> createHost()
{
- Map<String, Object> attributes = new HashMap<String, Object>();
+ Map<String, Object> attributes = new HashMap<>();
attributes.put(VirtualHost.NAME, getName());
attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
- attributes = new HashMap<String, Object>(attributes);
+ attributes = new HashMap<>(attributes);
attributes.put(VirtualHost.ID, UUID.randomUUID());
TestMemoryVirtualHost host = new TestMemoryVirtualHost(attributes, _virtualHostNode);
host.create();
@@ -124,20 +122,9 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
return host;
}
- private void verifyRegisteredQueueCount(int count)
- {
- assertEquals("Queue was not registered in virtualhost", count, _virtualHost.getChildren(Queue.class).size());
- }
-
-
- private void verifyQueueRegistered(String queueName)
- {
- assertNotNull("Queue " + queueName + " was not created", _virtualHost.getChildByName(Queue.class, queueName));
- }
-
public void testPriorityQueueRegistration() throws Exception
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testPriorityQueue");
@@ -147,304 +134,47 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertTrue("Queue not a priority queue", queue instanceof PriorityQueueImpl);
- verifyQueueRegistered("testPriorityQueue");
- verifyRegisteredQueueCount(1);
+ assertNotNull("Queue " + "testPriorityQueue" + " was not created", _virtualHost.getChildByName(Queue.class,
+ "testPriorityQueue"));
+ assertEquals("Queue was not registered in virtualhost", 1, _virtualHost.getChildren(Queue.class).size());
}
-
- public void testSimpleQueueRegistration() throws Exception
+ public void testSimpleQueueCreation() throws Exception
{
String queueName = getName();
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertTrue("Queue not a simple queue", queue instanceof StandardQueueImpl);
- verifyQueueRegistered(queueName);
-
- //verify that no alternate exchange or DLQ were produced
-
- assertNull("Queue should not have an alternate exchange as DLQ wasn't enabled", queue.getAlternateExchange());
- assertNull("The DLQ should not exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
-
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true does
- * cause the alternate exchange to be set and DLQ to be produced.
- */
- public void testDeadLetterQueueEnabled() throws Exception
- {
-
- String queueName = "testDeadLetterQueueEnabled";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not yet exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Map<String,Object> attributes = new HashMap<String, Object>();
-
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
-
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- Exchange<?> altExchange = queue.getAlternateExchange();
- assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
- assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
-
- assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Queue<?> dlQueue = (Queue<?>) _virtualHost.getChildByName(Queue.class, dlQueueName);
- assertNotNull("The DLQ was not registered as expected", dlQueue);
- assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
- assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
- assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts());
-
- //2 queues should have been registered
- verifyRegisteredQueueCount(2);
- }
-
- /**
- * Tests that the deadLetterQueues/maximumDeliveryCount settings from the configuration
- * are not applied to the DLQ itself.
- */
- public void testDeadLetterQueueDoesNotInheritDLQorMDCSettings() throws Exception
- {
-
- String queueName = "testDeadLetterQueueEnabled";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not yet exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
-
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- assertEquals("Unexpected maximum delivery count", 5, queue.getMaximumDeliveryAttempts());
- Exchange<?> altExchange = queue.getAlternateExchange();
- assertNotNull("Queue should have an alternate exchange as DLQ is enabled", altExchange);
- assertEquals("Alternate exchange name was not as expected", dlExchangeName, altExchange.getName());
- assertEquals("Alternate exchange type was not as expected", ExchangeDefaults.FANOUT_EXCHANGE_CLASS, altExchange.getType());
-
- assertNotNull("The alternate exchange was not registered as expected", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
- assertEquals("The registered exchange was not the expected exchange instance", altExchange, _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Queue<?> dlQueue = (Queue<?>) _virtualHost.getChildByName(Queue.class, dlQueueName);
- assertNotNull("The DLQ was not registered as expected", dlQueue);
- assertTrue("DLQ should have been bound to the alternate exchange", altExchange.isBound(dlQueue));
- assertNull("DLQ should have no alternate exchange", dlQueue.getAlternateExchange());
- assertEquals("DLQ should have a zero maximum delivery count", 0, dlQueue.getMaximumDeliveryAttempts());
-
- //2 queues should have been registered
- verifyRegisteredQueueCount(2);
- }
-
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument false does not
- * result in the alternate exchange being set and DLQ being created.
- */
- public void testDeadLetterQueueDisabled() throws Exception
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
-
-
- String queueName = "testDeadLetterQueueDisabled";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not exist", _virtualHost.getChildByName(Exchange.class,
- dlExchangeName));
-
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, false);
-
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- assertNull("Queue should not have an alternate exchange as DLQ is disabled", queue.getAlternateExchange());
- assertNull("The alternate exchange should still not exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
- assertNull("The DLQ should still not exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
-
- //only 1 queue should have been registered
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_DLQ_ENABLED} argument true but
- * creating an auto-delete queue, does not result in the alternate exchange
- * being set and DLQ being created.
- */
- public void testDeadLetterQueueNotCreatedForAutodeleteQueues() throws Exception
- {
-
- String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
- String dlExchangeName = queueName + QueueManagingVirtualHost.DEFAULT_DLE_NAME_SUFFIX;
- String dlQueueName = queueName + AbstractVirtualHost.DEFAULT_DLQ_NAME_SUFFIX;
-
- assertNull("The DLQ should not yet exist", _virtualHost.getChildByName(Queue.class, dlQueueName));
- assertNull("The alternate exchange should not exist", _virtualHost.getChildByName(Exchange.class, dlExchangeName));
-
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
-
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
- attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
-
- //create an autodelete queue
- Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
- assertEquals("Queue should be autodelete",
- LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS,
- queue.getLifetimePolicy());
-
- //ensure that the autodelete property overrides the request to enable DLQ
- assertNull("Queue should not have an alternate exchange as queue is autodelete", queue.getAlternateExchange());
- assertNull("The alternate exchange should not exist as queue is autodelete", _virtualHost.getChildByName( Exchange.class, dlExchangeName));
- assertNull("The DLQ should not exist as queue is autodelete", _virtualHost.getChildByName(Queue.class, dlQueueName));
+ assertNotNull("Queue " + queueName + " was not created", _virtualHost.getChildByName(Queue.class, queueName));
- //only 1 queue should have been registered
- verifyRegisteredQueueCount(1);
+ assertEquals("Queue was not registered in virtualhost", 1, _virtualHost.getChildren(Queue.class).size());
}
- /**
- * Tests that setting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument has
- * the desired effect.
- */
public void testMaximumDeliveryCount() throws Exception
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "testMaximumDeliveryCount");
- attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, (Object) 5);
+ attributes.put(Queue.MAXIMUM_DELIVERY_ATTEMPTS, 5);
final Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertNotNull("The queue was not registered as expected ", queue);
assertEquals("Maximum delivery count not as expected", 5, queue.getMaximumDeliveryAttempts());
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests that omitting the {@link org.apache.qpid.server.queue.QueueArgumentsConverter#X_QPID_MAXIMUM_DELIVERY_COUNT} argument means
- * that queue is created with a default maximumDeliveryCount of zero (unless set in config).
- */
- public void testMaximumDeliveryCountDefault() throws Exception
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, "testMaximumDeliveryCountDefault");
-
- final Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
-
- assertNotNull("The queue was not registered as expected ", queue);
- assertEquals("Maximum delivery count not as expected", 0, queue.getMaximumDeliveryAttempts());
-
- verifyRegisteredQueueCount(1);
- }
-
- /**
- * Tests queue creation with queue name set to null
- */
- public void testQueueNameNullValidation()
- {
- try
- {
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
-
- _virtualHost.createChild(Queue.class, attributes);
- fail("queue with null name can not be created!");
- }
- catch (Exception e)
- {
- assertTrue(e instanceof IllegalArgumentException);
- assertTrue(e.getMessage().startsWith("The name attribute is mandatory"));
- }
- }
-
- /**
- * Tests queue creation with queue name length less 255 characters but
- * corresponding DLQ name length greater than 255.
- */
- public void testQueueNameWithLengthLessThan255ButDLQNameWithLengthGreaterThan255()
- {
- String queueName = "test-" + generateStringWithLength('a', 245);
- try
- {
- // change DLQ name to make its length bigger than exchange name
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLE");
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQUEUE");
-
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
-
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, true);
-
- _virtualHost.createChild(Queue.class, attributes);
- fail("queue with DLQ name having more than 255 characters can not be created!");
- }
- catch (Exception e)
- {
- assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
- assertTrue("Unexpected exception message!", e.getMessage().contains("DLQ queue name")
- && e.getMessage().contains("length exceeds limit of 255"));
- }
- }
-
- /**
- * Tests queue creation with queue name length less 255 characters but
- * corresponding DL exchange name length greater than 255.
- */
- public void testQueueNameWithLengthLessThan255ButDLExchangeNameWithLengthGreaterThan255()
- {
- String queueName = "test-" + generateStringWithLength('a', 245);
- try
- {
- // change DLQ name to make its length bigger than exchange name
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, "_DLEXCHANGE");
- setTestSystemProperty(QueueManagingVirtualHost.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, "_DLQ");
-
- Map<String,Object> attributes = new HashMap<String, Object>();
- attributes.put(Queue.ID, UUID.randomUUID());
- attributes.put(Queue.NAME, queueName);
-
- attributes.put(AbstractVirtualHost.CREATE_DLQ_ON_CREATION, (Object) true);
-
- _virtualHost.createChild(Queue.class, attributes);
- fail("queue with DLE name having more than 255 characters can not be created!");
- }
- catch (Exception e)
- {
- assertTrue("Unexpected exception is thrown!", e instanceof IllegalArgumentException);
- assertTrue("Unexpected exception message!", e.getMessage().contains("DL exchange name")
- && e.getMessage().contains("length exceeds limit of 255"));
- }
+ assertEquals("Queue was not registered in virtualhost", 1, _virtualHost.getChildren(Queue.class).size());
}
public void testMessageGroupQueue() throws Exception
{
- Map<String,Object> attributes = new HashMap<String, Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, getTestName());
attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey");
@@ -455,15 +185,5 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
}
- private String generateStringWithLength(char ch, int length)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++)
- {
- sb.append(ch);
- }
- return sb.toString();
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
----------------------------------------------------------------------
diff --git a/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java b/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
index 2d764c9..68bbf8e 100644
--- a/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
+++ b/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapter.java
@@ -262,13 +262,14 @@ class LegacyAccessControlAdapter
properties.put(ObjectProperties.Property.TEMPORARY, lifeTimePolicy != LifetimePolicy.PERMANENT);
properties.put(ObjectProperties.Property.DURABLE, (Boolean)queue.getAttribute(ConfiguredObject.DURABLE));
properties.put(ObjectProperties.Property.EXCLUSIVE, queue.getAttribute(Queue.EXCLUSIVE) != ExclusivityPolicy.NONE);
- Object alternateExchange = queue.getAttribute(Queue.ALTERNATE_EXCHANGE);
- if (alternateExchange != null)
+ Object alternateBinding = queue.getAttribute(Queue.ALTERNATE_BINDING);
+ if (alternateBinding instanceof AlternateBinding)
{
- String name = alternateExchange instanceof ConfiguredObject ?
- (String)((ConfiguredObject)alternateExchange).getAttribute(ConfiguredObject.NAME) :
- String.valueOf(alternateExchange);
- properties.put(ObjectProperties.Property.ALTERNATE, name);
+ String name = ((AlternateBinding)alternateBinding).getDestination();
+ if (name != null && !"".equals(name))
+ {
+ properties.put(ObjectProperties.Property.ALTERNATE, name);
+ }
}
String owner = (String)queue.getAttribute(Queue.OWNER);
if (owner != null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
----------------------------------------------------------------------
diff --git a/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java b/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
index 648154e..1985035 100644
--- a/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
+++ b/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/config/LegacyAccessControlAdapterTest.java
@@ -35,6 +35,7 @@ import java.util.Map;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.queue.QueueConsumer;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
public class LegacyAccessControlAdapterTest extends QpidTestCase
@@ -45,7 +46,7 @@ public class LegacyAccessControlAdapterTest extends QpidTestCase
private static final String TEST_QUEUE = "testQueue";
private LegacyAccessControl _accessControl;
- private VirtualHost<?> _virtualHost;
+ private QueueManagingVirtualHost<?> _virtualHost;
private Broker _broker;
private VirtualHostNode<?> _virtualHostNode;
private LegacyAccessControlAdapter _adapter;
@@ -55,7 +56,7 @@ public class LegacyAccessControlAdapterTest extends QpidTestCase
{
super.setUp();
_accessControl = mock(LegacyAccessControl.class);
- _virtualHost = mock(VirtualHost.class);
+ _virtualHost = mock(QueueManagingVirtualHost.class);
when(_virtualHost.getName()).thenReturn(TEST_VIRTUAL_HOST);
@@ -260,7 +261,7 @@ public class LegacyAccessControlAdapterTest extends QpidTestCase
when(queue.getAttribute(Queue.OWNER)).thenReturn(null);
when(queue.getAttribute(Queue.EXCLUSIVE)).thenReturn(ExclusivityPolicy.NONE);
when(queue.getAttribute(Queue.DURABLE)).thenReturn(false);
- when(queue.getAttribute(Queue.ALTERNATE_EXCHANGE)).thenReturn(null);
+ when(queue.getAttribute(Queue.ALTERNATE_BINDING)).thenReturn(null);
when(queue.getCategoryClass()).thenReturn(Queue.class);
when(queue.getParent()).thenReturn(vh);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index b360249..0d8210b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
import org.apache.qpid.server.message.MessageInstance.EntryState;
@@ -473,12 +474,12 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
if(owningResource instanceof Queue)
{
final Queue<?> queue = (Queue<?>)owningResource;
- final Exchange alternateExchange = queue.getAlternateExchange();
+ final MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
- if(alternateExchange != null)
+ if(alternateBindingDestination != null)
{
getEventLogger().message(ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- alternateExchange.getName()));
+ alternateBindingDestination.getName()));
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index c34740e..1c85614 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -23,19 +23,20 @@ package org.apache.qpid.server.protocol.v0_10;
import java.nio.charset.StandardCharsets;
import java.security.AccessControlException;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import com.google.common.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
@@ -52,6 +53,7 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -79,7 +81,7 @@ import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -861,6 +863,7 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
return;
}
}
+ String alternateExchangeName = method.getAlternateExchange();
if(nameNullOrEmpty(method.getExchange()))
{
// special case handling to fake the existence of the default exchange for 0-10
@@ -871,11 +874,11 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
+ " of type " + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ " to " + method.getType() +".");
}
- if(!nameNullOrEmpty(method.getAlternateExchange()))
+ if(!nameNullOrEmpty(alternateExchangeName))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
"Attempt to set alternate exchange of the default exchange "
- + " to " + method.getAlternateExchange() +".");
+ + " to " + alternateExchangeName + ".");
}
}
else
@@ -911,7 +914,13 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, method.getDurable());
attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
method.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, method.getAlternateExchange());
+ if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName))
+ {
+ validateAlternateExchangeIsNotQueue(addressSpace, alternateExchangeName);
+
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
+ }
addressSpace.createMessageDestination(Exchange.class, attributes);;
}
catch(ReservedExchangeNameException e)
@@ -919,8 +928,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
Exchange<?> existingExchange = getExchange(session, exchangeName);
if(existingExchange == null
|| !existingExchange.getType().equals(method.getType())
- || (method.hasAlternateExchange() && (existingExchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(existingExchange.getAlternateExchange().getName()))) )
+ || (method.hasAlternateExchange() && (existingExchange.getAlternateBinding() == null ||
+ !alternateExchangeName
+ .equals(existingExchange.getAlternateBinding().getDestination()))) )
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ exchangeName + " which begins with reserved name or prefix.");
@@ -947,21 +957,23 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
+ " to " + method.getType() +".");
}
else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ && (exchange.getAlternateBinding() == null ||
+ !alternateExchangeName.equals(exchange.getAlternateBinding().getDestination())))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateBinding()
+ + " to " + alternateExchangeName + ".");
}
}
catch (AccessControlException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
-
-
+ catch (IllegalConfigurationException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, e.getMessage());
+ }
}
}
}
@@ -1095,9 +1107,9 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exchange.delete();
}
- catch (ExchangeIsAlternateException e)
+ catch (MessageDestinationIsAlternateException e)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange in use as an alternate binding destination");
}
catch (RequiredExchangeException e)
{
@@ -1518,20 +1530,17 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
final String alternateExchangeName = method.getAlternateExchange();
+ final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(queueName,
+ method.getArguments());
- final Map<String, Object> arguments = QueueArgumentsConverter.convertWireArgsToModel(method.getArguments());
-
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ if (method.hasAlternateExchange() && !nameNullOrEmpty(alternateExchangeName))
{
- arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName);
+ validateAlternateExchangeIsNotQueue(addressSpace, alternateExchangeName);
+ arguments.put(Queue.ALTERNATE_BINDING, Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
}
- final UUID id = UUID.randomUUID();
-
- arguments.put(Queue.ID, id);
arguments.put(Queue.NAME, queueName);
-
if(!arguments.containsKey(Queue.LIFETIME_POLICY))
{
LifetimePolicy lifetime;
@@ -1578,6 +1587,20 @@ public class ServerSessionDelegate extends MethodDelegate<ServerSession> impleme
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
+ catch (IllegalConfigurationException e)
+ {
+ exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, e.getMessage());
+ }
+ }
+ }
+
+ private void validateAlternateExchangeIsNotQueue(final NamedAddressSpace addressSpace, final String alternateExchangeName)
+ {
+ MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);
+ if (alternateMessageDestination != null && !(alternateMessageDestination instanceof Exchange))
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Alternate exchange '%s' is not a destination of type 'exchange'.", alternateExchangeName));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 61c8c26..937e1d0 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -52,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.protocol.ErrorCodes;
@@ -91,7 +92,7 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.MessageDestinationIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -113,6 +114,7 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
return input.getMessageInstance();
}
};
+ private static final String ALTERNATE_EXCHANGE = "alternateExchange";
private final DefaultQueueAssociationClearingTask
_defaultQueueAssociationClearingTask = new DefaultQueueAssociationClearingTask();
@@ -1638,16 +1640,10 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
{
final Queue<?> queue = (Queue<?>) owningResource;
- final Exchange altExchange = queue.getAlternateExchange();
+ final MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
- if (altExchange == null)
+ if (alternateBindingDestination == null)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: "
- + deliveryTag);
- }
messageWithSubject(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(),
queue.getName(),
msg.getInitialRoutingAddress()));
@@ -1655,14 +1651,8 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
else
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug(
- "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
- + deliveryTag);
- }
messageWithSubject(ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(),
- altExchange.getName()));
+ alternateBindingDestination.getName()));
}
}
}
@@ -2726,9 +2716,13 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
attributes.put(Exchange.DURABLE, durable);
attributes.put(Exchange.LIFETIME_POLICY,
autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
- if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
+
+ Object alternateExchange = attributes.remove(ALTERNATE_EXCHANGE);
+ if (alternateExchange != null)
{
- attributes.put(Exchange.ALTERNATE_EXCHANGE, null);
+ validateAlternateExchangeIsNotQueue(virtualHost, String.valueOf(alternateExchange));
+ attributes.put(Exchange.ALTERNATE_BINDING,
+ Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchange));
}
exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
@@ -2810,6 +2804,16 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
}
+ private void validateAlternateExchangeIsNotQueue(final NamedAddressSpace addressSpace, final String alternateExchangeName)
+ {
+ MessageDestination alternateMessageDestination = addressSpace.getAttainedMessageDestination(alternateExchangeName, false);
+ if (alternateMessageDestination != null && !(alternateMessageDestination instanceof Exchange))
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Alternate exchange '%s' is not a destination of type 'exchange'.", alternateExchangeName));
+ }
+ }
+
@Override
public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
{
@@ -2858,9 +2862,9 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
}
}
- catch (ExchangeIsAlternateException e)
+ catch (MessageDestinationIsAlternateException e)
{
- closeChannel(ErrorCodes.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ closeChannel(ErrorCodes.NOT_ALLOWED, "Exchange in use as an alternate binding destination");
}
catch (RequiredExchangeException e)
{
@@ -3064,9 +3068,17 @@ public class AMQChannel extends AbstractAMQPSession<AMQChannel, ConsumerTarget_0
try
{
- Map<String, Object> attributes =
- QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
final String queueNameString = AMQShortString.toString(queueName);
+ Map<String, Object> wireArguments = FieldTable.convertToMap(arguments);
+ Object alternateExchange = wireArguments.get(ALTERNATE_EXCHANGE);
+ if (alternateExchange != null)
+ {
+ validateAlternateExchangeIsNotQueue(virtualHost, String.valueOf(alternateExchange));
+ }
+
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments);
+
attributes.put(Queue.NAME, queueNameString);
attributes.put(Queue.DURABLE, durable);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index beb1a78..a535fd2 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
@@ -601,9 +602,9 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
final Queue<?> queue = (Queue<?>) owningResource;
- final Exchange altExchange = queue.getAlternateExchange();
+ final MessageDestination alternateBindingDestination = queue.getAlternateBindingDestination();
- if (altExchange == null)
+ if (alternateBindingDestination == null)
{
eventLogger.message(logSubject,
ChannelMessages.DISCARDMSG_NOALTEXCH(message.getMessageNumber(),
@@ -614,7 +615,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
eventLogger.message(logSubject,
ChannelMessages.DISCARDMSG_NOROUTE(message.getMessageNumber(),
- altExchange.getName()));
+ alternateBindingDestination.getName()));
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e4598dcd/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 48af203..a98c8e6 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -41,6 +41,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.exchange.DestinationReferrer;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
@@ -67,8 +68,8 @@ import org.apache.qpid.server.txn.DtxNotSupportedException;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
-import org.apache.qpid.server.virtualhost.LinkRegistryModel;
import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
public class ManagementAddressSpace implements NamedAddressSpace
@@ -167,6 +168,12 @@ public class ManagementAddressSpace implements NamedAddressSpace
return null;
}
+ @Override
+ public MessageDestination getAttainedMessageDestination(final String name, final boolean mayCreate)
+ {
+ return getAttainedMessageDestination(name);
+ }
+
ProxyMessageSource getProxyNode(final String name)
{
LOGGER.debug("RG: looking for proxy source {}", name);
@@ -413,5 +420,21 @@ public class ManagementAddressSpace implements NamedAddressSpace
{
}
+
+ @Override
+ public MessageDestination getAlternateBindingDestination()
+ {
+ return null;
+ }
+
+ @Override
+ public void removeReference(final DestinationReferrer destinationReferrer)
+ {
+ }
+
+ @Override
+ public void addReference(final DestinationReferrer destinationReferrer)
+ {
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org