You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2017/09/30 20:31:50 UTC
[4/4] qpid-broker-j git commit: QPID-7937 : Define message grouping
policy with an enum (default NONE),
if messageGroupKey is not supplied in Queue attributes,
use the message property "group-id"
QPID-7937 : Define message grouping policy with an enum (default NONE), if messageGroupKey is not supplied in Queue attributes, use the message property "group-id"
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3745dd95
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3745dd95
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3745dd95
Branch: refs/heads/master
Commit: 3745dd952e998b59f27699ad208a80a7c9ffe90e
Parents: 9d5a825
Author: rgodfrey <rg...@apache.org>
Authored: Sat Sep 30 22:10:58 2017 +0200
Committer: rgodfrey <rg...@apache.org>
Committed: Sat Sep 30 22:30:17 2017 +0200
----------------------------------------------------------------------
.../qpid/server/message/AMQMessageHeader.java | 2 +
.../message/internal/InternalMessageHeader.java | 7 ++
.../org/apache/qpid/server/model/Queue.java | 18 ++-
.../apache/qpid/server/queue/AbstractQueue.java | 31 +++--
.../AssignedConsumerMessageGroupManager.java | 15 ++-
.../queue/DefinedGroupMessageGroupManager.java | 25 +++--
.../qpid/server/queue/MessageGroupType.java | 28 +++++
.../server/queue/QueueArgumentsConverter.java | 75 +++++++------
.../VirtualHostStoreUpgraderAndRecoverer.java | 56 ++++++++++
.../server/virtualhost/AbstractVirtualHost.java | 7 ++
.../server/exchange/HeadersBindingTest.java | 7 +-
.../VirtualHostQueueCreationTest.java | 5 +-
.../MessageConverter_v0_10_to_Internal.java | 6 +
.../protocol/v0_10/MessageTransferHeader.java | 8 +-
.../v0_8/MessageConverter_v0_8_to_Internal.java | 7 +-
.../server/protocol/v0_8/MessageMetaData.java | 7 ++
.../protocol/v1_0/MessageMetaData_1_0.java | 13 +++
.../server/management/amqp/ManagementNode.java | 6 +
.../src/main/java/resources/addQueue.html | 25 +++--
.../java/resources/js/qpid/management/Queue.js | 8 +-
.../resources/js/qpid/management/addQueue.js | 13 ++-
.../src/main/java/resources/showQueue.html | 16 +--
.../org/apache/qpid/systest/rest/Asserts.java | 2 +-
.../server/queue/MessageGroupQueueTest.java | 112 ++++++++++---------
24 files changed, 341 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
index a920d89..c7d28d1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
@@ -33,6 +33,8 @@ public interface AMQMessageHeader
String getAppId();
+ String getGroupId();
+
String getMessageId();
String getMimeType();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
index 374e3df..a353a93 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessageHeader.java
@@ -132,6 +132,13 @@ public final class InternalMessageHeader implements AMQMessageHeader, Serializab
}
@Override
+ public String getGroupId()
+ {
+ final Object jmsxGroupId = _headers.get("JMSXGroupID");
+ return jmsxGroupId == null ? null : String.valueOf(jmsxGroupId);
+ }
+
+ @Override
public String getMessageId()
{
return _messageId;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
index 855603e..e48c54a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -33,13 +33,7 @@ import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInfo;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.CreatingLinkInfo;
-import org.apache.qpid.server.queue.NotificationCheck;
-import org.apache.qpid.server.queue.QueueConsumer;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.QueueEntryIterator;
-import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.util.Deletable;
@@ -70,7 +64,7 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
String EXCLUSIVE = "exclusive";
String MESSAGE_DURABILITY = "messageDurability";
String MESSAGE_GROUP_KEY = "messageGroupKey";
- String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
+ String MESSAGE_GROUP_TYPE = "messageGroupType";
String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
String NO_LOCAL = "noLocal";
@@ -159,8 +153,12 @@ public interface Queue<X extends Queue<X>> extends ConfiguredObject<X>,
@ManagedAttribute( defaultValue = "${queue.maximumDistinctGroups}")
int getMaximumDistinctGroups();
- @ManagedAttribute
- boolean isMessageGroupSharedGroups();
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = "queue.messageGroupType")
+ MessageGroupType DEFAULT_MESSAGE_GROUP_TYPE = MessageGroupType.NONE;
+
+ @ManagedAttribute( defaultValue = "${queue.messageGroupType}")
+ MessageGroupType getMessageGroupType();
@SuppressWarnings("unused")
@ManagedContextDefault( name = "queue.maximumDeliveryAttempts")
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 565a0dd..9b56cbc 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -133,8 +133,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private static final Logger _logger = LoggerFactory.getLogger(AbstractQueue.class);
- public static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
-
private static final QueueNotificationListener NULL_NOTIFICATION_LISTENER = new QueueNotificationListener()
{
@Override
@@ -229,6 +227,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
@ManagedAttributeField
private boolean _messageGroupSharedGroups;
@ManagedAttributeField
+ private MessageGroupType _messageGroupType;
+ @ManagedAttributeField
private String _messageGroupDefaultGroup;
@ManagedAttributeField
private int _maximumDistinctGroups;
@@ -486,21 +486,20 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
getEventLogger().message(_logSubject,
getCreatedLogMessage());
- if(getMessageGroupKey() != null)
+ switch(getMessageGroupType())
{
- if(isMessageGroupSharedGroups())
- {
+ case NONE:
+ _messageGroupManager = null;
+ break;
+ case STANDARD:
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKey(), getMaximumDistinctGroups());
+ break;
+ case SHARED_GROUPS:
_messageGroupManager =
new DefinedGroupMessageGroupManager(getMessageGroupKey(), getMessageGroupDefaultGroup(), this);
- }
- else
- {
- _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKey(), getMaximumDistinctGroups());
- }
- }
- else
- {
- _messageGroupManager = null;
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown messageGroupType type " + _messageGroupType);
}
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
@@ -3048,9 +3047,9 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
@Override
- public boolean isMessageGroupSharedGroups()
+ public MessageGroupType getMessageGroupType()
{
- return _messageGroupSharedGroups;
+ return _messageGroupType;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
index 38eac28..96671e5 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.message.AMQMessageHeader;
+
public class AssignedConsumerMessageGroupManager implements MessageGroupManager
{
@@ -56,7 +58,8 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
@Override
public boolean mightAssign(final QueueEntry entry, QueueConsumer sub)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ final AMQMessageHeader messageHeader = entry.getMessage().getMessageHeader();
+ Object groupVal = _groupId == null ? messageHeader.getGroupId() : messageHeader.getHeader(_groupId);
if(groupVal == null)
{
@@ -77,7 +80,8 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
private boolean assignMessage(QueueConsumer<?,?> sub, QueueEntry entry)
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ final AMQMessageHeader messageHeader = entry.getMessage().getMessageHeader();
+ Object groupVal = _groupId == null ? messageHeader.getGroupId() : messageHeader.getHeader(_groupId);
if(groupVal == null)
{
return true;
@@ -132,13 +136,14 @@ public class AssignedConsumerMessageGroupManager implements MessageGroupManager
return false;
}
- Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
- if(groupId == null)
+ final AMQMessageHeader messageHeader = entry.getMessage().getMessageHeader();
+ Object groupVal = _groupId == null ? messageHeader.getGroupId() : messageHeader.getHeader(_groupId);
+ if(groupVal == null)
{
return false;
}
- Integer group = groupId.hashCode() & _groupMask;
+ Integer group = groupVal.hashCode() & _groupMask;
QueueConsumer<?,?> assignedSub = _groupMap.get(group);
if(assignedSub == _sub)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
index c8e790e..06803a7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
@@ -20,21 +20,24 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
-import org.apache.qpid.server.message.MessageInstance.EntryState;
-import org.apache.qpid.server.util.StateChangeListener;
-
-import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.message.ServerMessage;
-
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstance.ConsumerAcquiredState;
+import org.apache.qpid.server.message.MessageInstance.EntryState;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.StateChangeListener;
+
public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
private final String _groupId;
private final String _defaultGroup;
private final Map<Object, Group> _groupMap = new HashMap<>();
@@ -250,7 +253,11 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
ServerMessage message = entry.getMessage();
AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
- Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+ Object groupVal = messageHeader == null
+ ? _defaultGroup
+ : _groupId == null
+ ? messageHeader.getGroupId()
+ : messageHeader.getHeader(_groupId);
if(groupVal == null)
{
groupVal = _defaultGroup;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java
new file mode 100644
index 0000000..f28affd
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public enum MessageGroupType
+{
+ NONE,
+ STANDARD,
+ SHARED_GROUPS
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
index aaf9bd1..0c6deb9 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
@@ -38,56 +38,57 @@ public class QueueArgumentsConverter
{
private static final Logger LOGGER = LoggerFactory.getLogger(QueueArgumentsConverter.class);
- public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
- public static final String X_QPID_CAPACITY = "x-qpid-capacity";
- public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
- public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
- public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
- public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
- public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
-
- public static final String QPID_ALERT_COUNT = "qpid.alert_count";
- public static final String QPID_ALERT_SIZE = "qpid.alert_size";
- public static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap";
+ private static final String SHARED_MSG_GROUP_ARG_VALUE = "1";
+ private static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ private static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ private static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ private static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ private static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ private static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ private static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
+ private static final String QPID_ALERT_COUNT = "qpid.alert_count";
+ private static final String QPID_ALERT_SIZE = "qpid.alert_size";
+ private static final String QPID_ALERT_REPEAT_GAP = "qpid.alert_repeat_gap";
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
- public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
+ private static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
- public static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
- public static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
- public static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
- public static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
- public static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
- public static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
+ private static final String QPID_QUEUE_SORT_KEY = "qpid.queue_sort_key";
+ static final String X_QPID_DLQ_ENABLED = "x-qpid-dlq-enabled";
+ private static final String X_QPID_MAXIMUM_DELIVERY_COUNT = "x-qpid-maximum-delivery-count";
+ static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ private static final String QPID_DEFAULT_MESSAGE_GROUP_ARG = "qpid.default-message-group";
- public static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
+ private static final String QPID_MESSAGE_DURABILITY = "qpid.message_durability";
- public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
+ private static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
- public static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
+ private static final String QPID_DEFAULT_FILTERS = "qpid.default_filters";
- public static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
+ private static final String QPID_ENSURE_NONDESTRUCTIVE_CONSUMERS = "qpid.ensure_nondestructive_consumers";
- public static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
- public static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
+ private static final String QPID_EXCLUSIVITY_POLICY = "qpid.exclusivity_policy";
+ private static final String QPID_LIFETIME_POLICY = "qpid.lifetime_policy";
- public static final String QPID_POLICY_TYPE = "qpid.policy_type";
- public static final String QPID_MAX_COUNT = "qpid.max_count";
- public static final String QPID_MAX_SIZE = "qpid.max_size";
+ private static final String QPID_POLICY_TYPE = "qpid.policy_type";
+ private static final String QPID_MAX_COUNT = "qpid.max_count";
+ private static final String QPID_MAX_SIZE = "qpid.max_size";
/**
* No-local queue argument is used to support the no-local feature of Durable Subscribers.
*/
- public static final String QPID_NO_LOCAL = "no-local";
+ private static final String QPID_NO_LOCAL = "no-local";
- static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
+ private static final Map<String, String> ATTRIBUTE_MAPPINGS = new LinkedHashMap<String, String>();
private static final String ALTERNATE_EXCHANGE = "alternateExchange";
private static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
- private static String PROPERTY_DEAD_LETTER_QUEUE_SUFFIX = "qpid.broker_dead_letter_queue_suffix";
+ private static final String PROPERTY_DEAD_LETTER_QUEUE_SUFFIX = "qpid.broker_dead_letter_queue_suffix";
static
{
@@ -150,10 +151,14 @@ public class QueueArgumentsConverter
modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
}
- if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
+ if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)
+ && SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))))
{
- modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
- AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP))));
+ modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
+ }
+ else if(wireArguments.containsKey(QPID_GROUP_HEADER_KEY))
+ {
+ modelArguments.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.STANDARD);
}
@@ -224,9 +229,9 @@ public class QueueArgumentsConverter
}
}
- if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS)))
+ if(MessageGroupType.SHARED_GROUPS.equals(modelArguments.get(Queue.MESSAGE_GROUP_TYPE)))
{
- wireArguments.put(QPID_SHARED_MSG_GROUP, AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE);
+ wireArguments.put(QPID_SHARED_MSG_GROUP, SHARED_MSG_GROUP_ARG_VALUE);
}
return wireArguments;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
index 1572600..f53fa24 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
@@ -745,6 +745,25 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
attributes.remove("bindings");
}
+ if(attributes.containsKey("messageGroupKey"))
+ {
+ if(attributes.containsKey("messageGroupSharedGroups")
+ && convertAttributeValueToBoolean("messageGroupSharedGroups",
+ attributes.remove("messageGroupSharedGroups")))
+ {
+ attributes.put("messageGroupType", "SHARED_GROUPS");
+
+ }
+ else
+ {
+ attributes.put("messageGroupType", "STANDARD");
+ }
+ }
+ else
+ {
+ attributes.put("messageGroupType", "NONE");
+ }
+
_queues.put(record.getId(), (String) attributes.get("name"));
if (!attributes.equals(new HashMap<>(record.getAttributes())) || addToUpdateMap)
@@ -788,6 +807,43 @@ public class VirtualHostStoreUpgraderAndRecoverer extends AbstractConfigurationS
return value;
}
+ private boolean convertAttributeValueToBoolean(final String attributeName,
+ final Object attributeValue)
+ {
+ boolean value;
+ if (attributeValue instanceof Boolean)
+ {
+ value = (Boolean) attributeValue;
+ }
+ else if (attributeValue instanceof String)
+ {
+ String strValue = (String)attributeValue;
+ if(strValue.equalsIgnoreCase("true"))
+ {
+ value = true;
+ }
+ else if(strValue.equalsIgnoreCase("false"))
+ {
+ value = false;
+ }
+ else
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot evaluate '%s': %s",
+ attributeName, attributeValue));
+ }
+
+ }
+ else
+ {
+ throw new IllegalConfigurationException(String.format("Cannot evaluate '%s': %s",
+ attributeName,
+ String.valueOf(attributeValue)));
+ }
+ return value;
+ }
+
+
@Override
public void complete()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 230638c..4126c84 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1800,6 +1800,13 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
@Override
+ public String getGroupId()
+ {
+ Object jmsXGroupId = getHeader("JMSXGroupID");
+ return jmsXGroupId == null ? null : String.valueOf(jmsXGroupId);
+ }
+
+ @Override
public String getMessageId()
{
return _message.getMessageId();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
index 5f5e573..68152d3 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
@@ -28,7 +28,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
@@ -75,6 +74,12 @@ public class HeadersBindingTest extends QpidTestCase
}
@Override
+ public String getGroupId()
+ {
+ return null;
+ }
+
+ @Override
public String getMessageId()
{
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
index f5fc87b..0c591da 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/VirtualHostQueueCreationTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.queue.MessageGroupType;
import org.apache.qpid.server.queue.PriorityQueue;
import org.apache.qpid.server.queue.PriorityQueueImpl;
import org.apache.qpid.server.queue.StandardQueueImpl;
@@ -178,11 +179,11 @@ public class VirtualHostQueueCreationTest extends QpidTestCase
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, getTestName());
attributes.put(Queue.MESSAGE_GROUP_KEY,"mykey");
- attributes.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, true);
+ attributes.put(Queue.MESSAGE_GROUP_TYPE, MessageGroupType.SHARED_GROUPS);
Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertEquals("mykey", queue.getAttribute(Queue.MESSAGE_GROUP_KEY));
- assertEquals(Boolean.TRUE, queue.getAttribute(Queue.MESSAGE_GROUP_SHARED_GROUPS));
+ assertEquals(MessageGroupType.SHARED_GROUPS, queue.getAttribute(Queue.MESSAGE_GROUP_TYPE));
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index 1ea0c12..0610d09 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -185,6 +185,12 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
}
@Override
+ public String getGroupId()
+ {
+ return _delegate.getGroupId();
+ }
+
+ @Override
public String getMessageId()
{
return _delegate.getMessageId();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
index 5eb8406..6156781 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferHeader.java
@@ -42,7 +42,6 @@ class MessageTransferHeader implements AMQMessageHeader
private final DeliveryProperties _deliveryProps;
private final MessageProperties _messageProps;
private final long _arrivalTime;
- private long _notValidBefore;
public MessageTransferHeader(DeliveryProperties deliveryProps,
MessageProperties messageProps,
@@ -100,6 +99,13 @@ class MessageTransferHeader implements AMQMessageHeader
}
@Override
+ public String getGroupId()
+ {
+ Object jmsXGroupId = getHeader("JMSXGroupID");
+ return jmsXGroupId == null ? null : String.valueOf(jmsXGroupId);
+ }
+
+ @Override
public String getMessageId()
{
UUID id = _messageProps == null ? null : _messageProps.getMessageId();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index dbd2194..2cb6b9a 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -28,7 +28,6 @@ import static org.apache.qpid.server.message.mimecontentconverter.ConversionUtil
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -232,6 +231,12 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
}
@Override
+ public String getGroupId()
+ {
+ return _delegate.getGroupId();
+ }
+
+ @Override
public String getMessageId()
{
return _delegate.getMessageId();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 00953b9..d5baf87 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -241,6 +241,13 @@ public class MessageMetaData implements StorableMessageMetaData
}
@Override
+ public String getGroupId()
+ {
+ String jmsXGroupId = getProperties().getHeaders().getString("JMSXGroupID");
+ return jmsXGroupId == null ? null : jmsXGroupId;
+ }
+
+ @Override
public String getCorrelationId()
{
return getProperties().getCorrelationIdAsString();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 6d59a15..b82b132 100755
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -629,6 +629,19 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
}
@Override
+ public String getGroupId()
+ {
+ if (_propertiesSection == null || _propertiesSection.getValue().getGroupId() == null)
+ {
+ return null;
+ }
+ else
+ {
+ return _propertiesSection.getValue().getGroupId();
+ }
+ }
+
+ @Override
public String getUserId()
{
if (_propertiesSection == null || _propertiesSection.getValue().getUserId() == null)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index f4e7b34..4790b78 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -1859,6 +1859,12 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
}
@Override
+ public String getGroupId()
+ {
+ return null;
+ }
+
+ @Override
public String getMessageId()
{
return _messageId;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/addQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/addQueue.html b/broker-plugins/management-http/src/main/java/resources/addQueue.html
index f726a3a..18fd1d1 100644
--- a/broker-plugins/management-http/src/main/java/resources/addQueue.html
+++ b/broker-plugins/management-http/src/main/java/resources/addQueue.html
@@ -316,6 +316,19 @@
<div class="infoMessage">Configuring maximum delivery retries on a queue which has no alternate binding (DLQ or exchange) <br/> will result in messages being discarded after the limit is reached.</div>
<div class="clear">
+ <div class="formLabel-labelCell">Message Group Type:</div>
+ <div class="formLabel-controlCell">
+ <input type="text" id="formAddQueue.messageGroupType"
+ data-dojo-type="dijit/form/FilteringSelect"
+ data-dojo-props="
+ name: 'messageGroupType',
+ required: false,
+ promptMessage: 'Select message grouping type',
+ title: 'Select message grouping type'"/>
+ </div>
+
+ </div>
+ <div class="clear">
<div class="formLabel-labelCell">Message Group Key:</div>
<div class="formLabel-controlCell">
<input type="text" id="formAddQueue.messageGroupKey"
@@ -329,18 +342,6 @@
</div>
</div>
<div class="clear">
- <div class="formLabel-labelCell">Shared Message Groups?</div>
- <div class="formLabel-controlCell">
- <input type="checkbox" id="formAddQueue.messageGroupSharedGroups"
- dojoType="dijit.form.CheckBox"
- data-dojo-props="
- name: 'messageGroupSharedGroups',
- value: 'messageGroupSharedGroups',
- checked: false,
- title: 'Controls where a shared groups feature is enabled'"/>
- </div>
- </div>
- <div class="clear">
<div class="formLabel-labelCell">Hold on Publish Enabled?</div>
<div class="formLabel-controlCell">
<input type="checkbox" id="formAddQueue.holdOnPublishEnabled"
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
index 5a21ab9..91f8793 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
@@ -411,7 +411,7 @@ define(["dojo/_base/declare",
"alternateBinding",
"messageGroups",
"messageGroupKey",
- "messageGroupSharedGroups",
+ "messageGroupType",
"maximumDeliveryAttempts",
"holdOnPublishEnabled"]);
@@ -579,11 +579,11 @@ define(["dojo/_base/declare",
{
this.maximumQueueDepth.style.display = "none";
}
- if (this.queueData["messageGroupKey"])
+ var messageGroupType = this.queueData["messageGroupType"];
+ this["messageGroupType"].innerHTML = entities.encode(messageGroupType);
+ if (this.queueData["messageGroupKey"] || (messageGroupType && messageGroupType !== "NONE"))
{
this.messageGroupKey.innerHTML = entities.encode(String(this.queueData["messageGroupKey"]));
- this.messageGroupSharedGroups.innerHTML =
- entities.encode(String(this.queueData["messageGroupSharedGroups"]));
this.messageGroups.style.display = "block";
}
else
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
index d59ea25..337c94d 100644
--- a/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
+++ b/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
@@ -92,6 +92,7 @@ define(["dojo/dom",
this.queueType = registry.byId("formAddQueue.type");
this.context = registry.byId("formAddQueue.context");
this.overflowPolicyWidget = registry.byId("formAddQueue.overflowPolicy");
+ this.messageGroupTypeWidget = registry.byId("formAddQueue.messageGroupType");
this.editNodeBanner = dom.byId("addQueue.editNoteBanner");
@@ -215,10 +216,16 @@ define(["dojo/dom",
{
this.alternateBindingLoadPromise.then(lang.hitch(this, function ()
{
- var validValues = this.management.metadata.getMetaData("Queue",
+ var validOverflowValues = this.management.metadata.getMetaData("Queue",
this.initialData.type).attributes.overflowPolicy.validValues;
- var validValueStore = util.makeTypeStore(validValues);
- this.overflowPolicyWidget.set("store", validValueStore);
+ var validOverflowValueStore = util.makeTypeStore(validOverflowValues);
+ this.overflowPolicyWidget.set("store", validOverflowValueStore);
+
+ var validGroupingValues = this.management.metadata.getMetaData("Queue",
+ this.initialData.type).attributes.messageGroupType.validValues;
+ var validGroupingValueStore = util.makeTypeStore(validGroupingValues);
+ this.messageGroupTypeWidget.set("store", validGroupingValueStore);
+
util.applyToWidgets(this.form.domNode,
"Queue",
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/broker-plugins/management-http/src/main/java/resources/showQueue.html
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/resources/showQueue.html b/broker-plugins/management-http/src/main/java/resources/showQueue.html
index 87d7ce2..7424614 100644
--- a/broker-plugins/management-http/src/main/java/resources/showQueue.html
+++ b/broker-plugins/management-http/src/main/java/resources/showQueue.html
@@ -93,14 +93,14 @@
<div class="minimumMessageTtl"></div>
</div>
<div class="clear messageGroups">
- <div class="clear">
- <div class="messageGroupKeyLabel formLabel-labelCell ">Message Group Key:</div>
- <div class="messageGroupKey"></div>
- </div>
- <div class="clear">
- <div class="messageGroupSharedGroupsLabel formLabel-labelCell">Shared Message Groups:</div>
- <div class="messageGroupSharedGroups"></div>
- </div>
+ <div class="clear">
+ <div class="messageGroupTypeLabel formLabel-labelCell">Message Group Type:</div>
+ <div class="messageGroupType"></div>
+ </div>
+ <div class="clear">
+ <div class="messageGroupKeyLabel formLabel-labelCell ">Message Group Key:</div>
+ <div class="messageGroupKey"></div>
+ </div>
</div>
<div class="clear"></div>
</div>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java b/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
index 0689eaa..1c36eec 100644
--- a/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
+++ b/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
@@ -123,7 +123,7 @@ public class Asserts
LastValueQueue.LVQ_KEY,
SortedQueue.SORT_KEY,
Queue.MESSAGE_GROUP_KEY,
- Queue.MESSAGE_GROUP_SHARED_GROUPS,
+ Queue.MESSAGE_GROUP_TYPE,
PriorityQueue.PRIORITIES,
ConfiguredObject.CONTEXT,
ConfiguredObject.DESIRED_STATE,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3745dd95/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
index f44ffe6..577bc2d 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -77,15 +77,25 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
public void testSimpleGroupAssignment() throws Exception
{
- simpleGroupAssignment(false);
+ simpleGroupAssignment(false, false);
}
public void testSharedGroupSimpleGroupAssignment() throws Exception
{
- simpleGroupAssignment(true);
+ simpleGroupAssignment(true, false);
}
+ public void testSimpleGroupAssignmentWithJMSXGroupID() throws Exception
+ {
+ simpleGroupAssignment(false, true);
+ }
+
+ public void testSharedGroupSimpleGroupAssignmentWithJMSXGroupID() throws Exception
+ {
+ simpleGroupAssignment(true, true);
+ }
+
/**
* Pre populate the queue with messages with groups as follows
*
@@ -113,15 +123,15 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
* c1 ack --->
*
*/
- private void simpleGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
+ private void simpleGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
{
- createQueueAndProducer(sharedGroups);
+ createQueueAndProducer(sharedGroups, useDefaultGroup);
String[] groups = { "ONE", "TWO"};
for (int msg = 0; msg < 4; msg++)
{
- producer.send(createMessage(msg, groups[msg % groups.length]));
+ producer.send(createMessage(msg, groups[msg % groups.length], useDefaultGroup));
}
producerSession.commit();
producer.close();
@@ -149,15 +159,15 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Message cs2Received2 = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received second message", cs2Received2);
- assertEquals("Differing groups", cs2Received2.getStringProperty("group"),
- cs2Received.getStringProperty("group"));
+ assertEquals("Differing groups", cs2Received2.getStringProperty(useDefaultGroup ? "JMSXGroupID" :"group"),
+ cs2Received.getStringProperty(useDefaultGroup ? "JMSXGroupID" :"group"));
cs1Received.acknowledge();
Message cs1Received2 = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received second message", cs1Received2);
- assertEquals("Differing groups", cs1Received2.getStringProperty("group"),
- cs1Received.getStringProperty("group"));
+ assertEquals("Differing groups", cs1Received2.getStringProperty(useDefaultGroup ? "JMSXGroupID" :"group"),
+ cs1Received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
cs1Received2.acknowledge();
cs2Received2.acknowledge();
@@ -166,20 +176,21 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
assertNull(consumer2.receive(getShortReceiveTimeout()));
}
- private void createQueueAndProducer(final boolean sharedGroups) throws QpidException, JMSException
+ private void createQueueAndProducer(final boolean sharedGroups, final boolean useDefaultKey) throws QpidException, JMSException
{
- if(isBroker10())
+ if(isBroker10() || useDefaultKey)
{
final Map<String, Object> arguments = new HashMap<>();
- arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY, "group");
- arguments.put(ConfiguredObject.DURABLE, "false");
- arguments.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.toString());
- if(sharedGroups)
+ if(!useDefaultKey)
{
- arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_SHARED_GROUPS, "true");
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY, "group");
}
+ arguments.put(ConfiguredObject.DURABLE, "false");
+ arguments.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.toString());
+ arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_TYPE, sharedGroups ? MessageGroupType.SHARED_GROUPS.name() : MessageGroupType.STANDARD.name());
+
createEntityUsingAmqpManagement(QUEUE, producerSession, "org.apache.qpid.Queue", arguments);
- queue = producerSession.createQueue(QUEUE);
+ queue = producerSession.createQueue(isBroker10() ? QUEUE : "ADDR:"+QUEUE+" ; {assert : never, node: { type: queue } }");
}
else
{
@@ -204,12 +215,12 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
public void testConsumerCloseGroupAssignment() throws Exception
{
- consumerCloseGroupAssignment(false);
+ consumerCloseGroupAssignment(false, false);
}
public void testSharedGroupConsumerCloseGroupAssignment() throws Exception
{
- consumerCloseGroupAssignment(true);
+ consumerCloseGroupAssignment(true, false);
}
/**
@@ -229,14 +240,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
* requires c2 to go "backwards" in the queue).
*
**/
- private void consumerCloseGroupAssignment(boolean sharedGroups) throws QpidException, JMSException
+ private void consumerCloseGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
{
- createQueueAndProducer(sharedGroups);
+ createQueueAndProducer(sharedGroups, false);
- producer.send(createMessage(1, "ONE"));
- producer.send(createMessage(2, "ONE"));
- producer.send(createMessage(3, "TWO"));
- producer.send(createMessage(4, "ONE"));
+ producer.send(createMessage(1, "ONE", useDefaultGroup));
+ producer.send(createMessage(2, "ONE", useDefaultGroup));
+ producer.send(createMessage(3, "TWO", useDefaultGroup));
+ producer.send(createMessage(4, "ONE", useDefaultGroup));
producerSession.commit();
producer.close();
@@ -285,7 +296,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Message cs2Received3 = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received second message", cs2Received3);
- assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
+ assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
if(is010)
@@ -301,7 +312,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Message cs2Received4 = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received third message", cs2Received4);
- assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
+ assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
if(is010)
{
@@ -320,12 +331,12 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
public void testConsumerCloseWithRelease() throws Exception
{
- consumerCloseWithRelease(false);
+ consumerCloseWithRelease(false, false);
}
public void testSharedGroupConsumerCloseWithRelease() throws Exception
{
- consumerCloseWithRelease(true);
+ consumerCloseWithRelease(true, false);
}
@@ -346,14 +357,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
* requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
*
*/
- private void consumerCloseWithRelease(boolean sharedGroups) throws QpidException, JMSException
+ private void consumerCloseWithRelease(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
{
- createQueueAndProducer(sharedGroups);
+ createQueueAndProducer(sharedGroups, false);
- producer.send(createMessage(1, "ONE"));
- producer.send(createMessage(2, "ONE"));
- producer.send(createMessage(3, "TWO"));
- producer.send(createMessage(4, "ONE"));
+ producer.send(createMessage(1, "ONE", useDefaultGroup));
+ producer.send(createMessage(2, "ONE", useDefaultGroup));
+ producer.send(createMessage(3, "TWO", useDefaultGroup));
+ producer.send(createMessage(4, "ONE", useDefaultGroup));
producerSession.commit();
producer.close();
@@ -398,7 +409,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should now have received second message", received);
- assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("Unexpected group", "ONE", received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
received.getJMSRedelivered());
@@ -415,7 +426,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received a third message", received);
- assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("Unexpected group", "ONE", received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
if(is010)
@@ -430,7 +441,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received a fourth message", received);
- assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("Unexpected group", "ONE", received.getStringProperty(useDefaultGroup ? "JMSXGroupID" : "group"));
assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
if(is010)
@@ -447,22 +458,22 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
public void testGroupAssignmentSurvivesEmpty() throws JMSException, QpidException
{
- groupAssignmentOnEmpty(false);
+ groupAssignmentOnEmpty(false, false);
}
public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, QpidException
{
- groupAssignmentOnEmpty(true);
+ groupAssignmentOnEmpty(true, false);
}
- private void groupAssignmentOnEmpty(boolean sharedGroups) throws QpidException, JMSException
+ private void groupAssignmentOnEmpty(boolean sharedGroups, final boolean useDefaultGroup) throws QpidException, JMSException
{
- createQueueAndProducer(sharedGroups);
+ createQueueAndProducer(sharedGroups, useDefaultGroup);
- producer.send(createMessage(1, "ONE"));
- producer.send(createMessage(2, "TWO"));
- producer.send(createMessage(3, "THREE"));
- producer.send(createMessage(4, "ONE"));
+ producer.send(createMessage(1, "ONE", useDefaultGroup));
+ producer.send(createMessage(2, "TWO", useDefaultGroup));
+ producer.send(createMessage(3, "THREE", useDefaultGroup));
+ producer.send(createMessage(4, "ONE", useDefaultGroup));
producerSession.commit();
producer.close();
@@ -561,11 +572,11 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
}
- private Message createMessage(int msg, String group) throws JMSException
+ private Message createMessage(int msg, String group, final boolean useDefaultGroup) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);
send.setIntProperty("msg", msg);
- send.setStringProperty("group", group);
+ send.setStringProperty(useDefaultGroup ? "JMSXGroupID" : "group", group);
return send;
}
@@ -580,7 +591,8 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
consumerConnection = getConnectionWithPrefetch(1);
- createQueueAndProducer(true);
+ final boolean useDefaultGroup = false;
+ createQueueAndProducer(true, useDefaultGroup);
int numMessages = 100;
SharedGroupTestMessageListener groupingTestMessageListener = new SharedGroupTestMessageListener(numMessages);
@@ -602,7 +614,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
for(int i = 1; i <= numMessages; i++)
{
- producer.send(createMessage(i, "GROUP"));
+ producer.send(createMessage(i, "GROUP", useDefaultGroup));
}
producerSession.commit();
producer.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org