You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/30 20:24:39 UTC

[1/2] activemq-artemis git commit: This closes #2387

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 8bf549b7c -> 123383b8d


This closes #2387


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/123383b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/123383b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/123383b8

Branch: refs/heads/master
Commit: 123383b8df4e4a5cc6d484e4c801cfc711eb7097
Parents: 8bf549b f30ca44
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Oct 30 16:24:33 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Oct 30 16:24:33 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |   6 +
 .../artemis/core/message/impl/CoreMessage.java  |   6 +
 .../activemq/artemis/reader/MessageUtil.java    |   6 +-
 .../artemis/jms/client/ActiveMQMessage.java     | 123 ++++++++++----
 .../protocol/amqp/broker/AMQPMessage.java       |  11 ++
 .../openwire/OpenWireMessageConverter.java      |   4 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../artemis/core/server/impl/QueueImpl.java     |  34 +++-
 docs/user-manual/en/message-grouping.md         |  17 ++
 .../integration/amqp/JMSMessageGroupsTest.java  | 161 +++++++++++++++++--
 10 files changed, 323 insertions(+), 49 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-2142 Support JMSXGroupSeq -1 to close/reset group.

Posted by cl...@apache.org.
ARTEMIS-2142 Support JMSXGroupSeq -1 to close/reset group.

Add test cases
Add GroupSequence to Message Interface
Implement Support closing/reset group in queue impl
Update Documentation (copy from activemq5)

Change/Fix OpenWireMessageConverter to use default of 0 if not set, for OpenWire as per documentation http://activemq.apache.org/activemq-message-properties.html


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f30ca44c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f30ca44c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f30ca44c

Branch: refs/heads/master
Commit: f30ca44c825efa365380b1a977295637c7cbff92
Parents: 8bf549b
Author: Michael André Pearce <mi...@me.com>
Authored: Mon Oct 22 16:31:00 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Oct 30 16:24:33 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |   6 +
 .../artemis/core/message/impl/CoreMessage.java  |   6 +
 .../activemq/artemis/reader/MessageUtil.java    |   6 +-
 .../artemis/jms/client/ActiveMQMessage.java     | 123 ++++++++++----
 .../protocol/amqp/broker/AMQPMessage.java       |  11 ++
 .../openwire/OpenWireMessageConverter.java      |   4 +-
 .../core/server/ActiveMQServerLogger.java       |   4 +
 .../artemis/core/server/impl/QueueImpl.java     |  34 +++-
 docs/user-manual/en/message-grouping.md         |  17 ++
 .../integration/amqp/JMSMessageGroupsTest.java  | 161 +++++++++++++++++--
 10 files changed, 323 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 667f95f..953e112 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -113,6 +113,8 @@ public interface Message {
     */
    SimpleString HDR_GROUP_ID = new SimpleString("_AMQ_GROUP_ID");
 
+   SimpleString HDR_GROUP_SEQUENCE = new SimpleString("_AMQ_GROUP_SEQUENCE");
+
    /**
     * to determine if the Large Message was compressed.
     */
@@ -248,6 +250,10 @@ public interface Message {
       return null;
    }
 
+   default int getGroupSequence() {
+      return 0;
+   }
+
    SimpleString getReplyTo();
 
    Message setReplyTo(SimpleString address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 62a81a1..d716559 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -289,6 +289,12 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
    }
 
+   @Override
+   public int getGroupSequence() {
+      final Integer integer = this.getIntProperty(Message.HDR_GROUP_SEQUENCE);
+      return integer == null ? 0 : integer;
+   }
+
    /**
     * @param sendBuffer
     * @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index db1f3dc..0eceb61 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -20,7 +20,6 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -49,6 +48,8 @@ public class MessageUtil {
 
    public static final String JMSXGROUPID = "JMSXGroupID";
 
+   public static final String JMSXGROUPSEQ = "JMSXGroupSeq";
+
    public static final String JMSXUSERID = "JMSXUserID";
 
    public static final SimpleString CONNECTION_ID_PROPERTY_NAME = new SimpleString("__AMQ_CID");
@@ -154,6 +155,8 @@ public class MessageUtil {
       for (SimpleString propName : message.getPropertyNames()) {
          if (propName.equals(Message.HDR_GROUP_ID)) {
             set.add(MessageUtil.JMSXGROUPID);
+         } else if (propName.equals(Message.HDR_GROUP_SEQUENCE)) {
+            set.add(MessageUtil.JMSXGROUPSEQ);
          } else if (propName.equals(Message.HDR_VALIDATED_USER)) {
             set.add(MessageUtil.JMSXUSERID);
          } else if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) || propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE) && !propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
@@ -169,6 +172,7 @@ public class MessageUtil {
    public static boolean propertyExists(Message message, String name) {
       return message.containsProperty(new SimpleString(name)) || name.equals(MessageUtil.JMSXDELIVERYCOUNT) ||
          (MessageUtil.JMSXGROUPID.equals(name) && message.containsProperty(Message.HDR_GROUP_ID)) ||
+         (MessageUtil.JMSXGROUPSEQ.equals(name) && message.containsProperty(Message.HDR_GROUP_SEQUENCE)) ||
          (MessageUtil.JMSXUSERID.equals(name) && message.containsProperty(Message.HDR_VALIDATED_USER));
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index a3360ef..791caa8 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.jms.client;
 
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -30,11 +36,6 @@ import javax.management.openmbean.CompositeDataSupport;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -561,12 +562,14 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    @Override
    public int getIntProperty(final String name) throws JMSException {
-      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
-         return message.getDeliveryCount();
-      }
-
       try {
-         return message.getIntProperty(name);
+         if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+            return message.getDeliveryCount();
+         } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
+            return message.getGroupSequence();
+         } else {
+            return message.getIntProperty(name);
+         }
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -574,12 +577,14 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    @Override
    public long getLongProperty(final String name) throws JMSException {
-      if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
-         return message.getDeliveryCount();
-      }
-
       try {
-         return message.getLongProperty(name);
+         if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+            return message.getDeliveryCount();
+         } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
+            return message.getGroupSequence();
+         } else {
+            return message.getLongProperty(name);
+         }
       } catch (ActiveMQPropertyConversionException e) {
          throw new MessageFormatException(e.getMessage());
       }
@@ -611,7 +616,9 @@ public class ActiveMQMessage implements javax.jms.Message {
 
       try {
          if (MessageUtil.JMSXGROUPID.equals(name)) {
-            return message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID);
+            return Objects.toString(message.getGroupID(), null);
+         } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
+            return Integer.toString(message.getGroupSequence());
          } else if (MessageUtil.JMSXUSERID.equals(name)) {
             return message.getValidatedUserID();
          } else {
@@ -624,13 +631,20 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    @Override
    public Object getObjectProperty(final String name) throws JMSException {
+      final Object val;
       if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
-         return String.valueOf(message.getDeliveryCount());
+         val = message.getDeliveryCount();
+      } else if (MessageUtil.JMSXGROUPID.equals(name)) {
+         val = message.getGroupID();
+      } else if (MessageUtil.JMSXGROUPSEQ.equals(name)) {
+         val = message.getGroupSequence();
+      } else if (MessageUtil.JMSXUSERID.equals(name)) {
+         val = message.getValidatedUserID();
+      } else {
+         val = message.getObjectProperty(name);
       }
-
-      Object val = message.getObjectProperty(name);
       if (val instanceof SimpleString) {
-         val = val.toString();
+         return val.toString();
       }
       return val;
    }
@@ -662,12 +676,18 @@ public class ActiveMQMessage implements javax.jms.Message {
    @Override
    public void setIntProperty(final String name, final int value) throws JMSException {
       checkProperty(name);
+      if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
+         return;
+      }
       message.putIntProperty(name, value);
    }
 
    @Override
    public void setLongProperty(final String name, final long value) throws JMSException {
       checkProperty(name);
+      if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
+         return;
+      }
       message.putLongProperty(name, value);
    }
 
@@ -687,9 +707,11 @@ public class ActiveMQMessage implements javax.jms.Message {
    public void setStringProperty(final String name, final String value) throws JMSException {
       checkProperty(name);
 
-      if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
+      if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
+         return;
+      } else if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
          return;
-      } else if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
+      } else if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
          return;
       } else {
          message.putStringProperty(name, value);
@@ -698,11 +720,13 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    @Override
    public void setObjectProperty(final String name, final Object value) throws JMSException {
-      if (handleCoreProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
+      if (handleCoreStringProperty(name, value, MessageUtil.JMSXGROUPID, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID)) {
          return;
       }
-
-      if (handleCoreProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
+      if (handleCoreIntegerProperty(name, value, MessageUtil.JMSXGROUPSEQ, org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE)) {
+         return;
+      }
+      if (handleCoreStringProperty(name, value, MessageUtil.JMSXUSERID, org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER)) {
          return;
       }
 
@@ -716,14 +740,14 @@ public class ActiveMQMessage implements javax.jms.Message {
          return;
       }
 
-      checkProperty(name);
-
       if (ActiveMQJMSConstants.JMS_ACTIVEMQ_INPUT_STREAM.equals(name)) {
          setInputStream((InputStream) value);
 
          return;
       }
 
+      checkProperty(name);
+
       try {
          message.putObjectProperty(name, value);
       } catch (ActiveMQPropertyConversionException e) {
@@ -979,10 +1003,47 @@ public class ActiveMQMessage implements javax.jms.Message {
       }
    }
 
-   private boolean handleCoreProperty(final String name,
-                                      final Object value,
-                                      String jmsPropertyName,
-                                      SimpleString corePropertyName) {
+   private boolean handleCoreIntegerProperty(final String name,
+                                            final Object value,
+                                            String jmsPropertyName,
+                                            SimpleString corePropertyName) {
+      if (jmsPropertyName.equals(name)) {
+         return handleCoreIntegerProperty(name, getInteger(value), jmsPropertyName, corePropertyName);
+      }
+      return false;
+   }
+
+   private boolean handleCoreIntegerProperty(final String name,
+                                             final int value,
+                                             String jmsPropertyName,
+                                             SimpleString corePropertyName) {
+      boolean result = false;
+
+      if (jmsPropertyName.equals(name)) {
+         message.putIntProperty(corePropertyName, value);
+         result = true;
+      }
+
+      return result;
+   }
+
+   private static int getInteger(final Object value) {
+      Objects.requireNonNull(value);
+      final int integer;
+      if (value instanceof Integer) {
+         integer = (Integer) value;
+      } else if (value instanceof Number) {
+         integer = ((Number) value).intValue();
+      } else {
+         integer = Integer.parseInt(value.toString());
+      }
+      return integer;
+   }
+
+   private boolean handleCoreStringProperty(final String name,
+                                            final Object value,
+                                            String jmsPropertyName,
+                                            SimpleString corePropertyName) {
       boolean result = false;
 
       if (jmsPropertyName.equals(name)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index c7f1b50..ab67330 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -1100,6 +1100,17 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public int getGroupSequence() {
+      ensureMessageDataScanned();
+
+      if (properties != null && properties.getGroupSequence() != null) {
+         return properties.getGroupSequence().intValue();
+      } else {
+         return 0;
+      }
+   }
+
+   @Override
    public Long getScheduledDeliveryTime() {
       if (scheduledTime < 0) {
          Object objscheduledTime = getMessageAnnotation(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 49d897d..e958267 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -83,7 +83,7 @@ public final class OpenWireMessageConverter {
    private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
    private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
    private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
-   private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
+   private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_SEQUENCE;
    private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
    private static final SimpleString AMQ_MSG_ORIG_DESTINATION =  new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
    private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
@@ -616,7 +616,7 @@ public final class OpenWireMessageConverter {
 
       Integer groupSequence = (Integer) coreMessage.getObjectProperty(AMQ_MSG_GROUP_SEQUENCE);
       if (groupSequence == null) {
-         groupSequence = -1;
+         groupSequence = 0;
       }
       amqMsg.setGroupSequence(groupSequence);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index cad835a..43bed88 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1613,6 +1613,10 @@ public interface ActiveMQServerLogger extends BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void problemAddingConfigReloadCallback(String propertyName, @Cause Exception e);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222278, value = "Unable to extract GroupSequence from message", format = Message.Format.MESSAGE_FORMAT)
+   void unableToExtractGroupSequence(@Cause Throwable e);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 747db5c..292cfc1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2531,8 +2531,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
                   removeMessageReference(holder, ref);
 
-                  if (groupID != null && groupConsumer == null && redistributor == null) {
-                     groups.put(groupID, consumer);
+                  if (redistributor == null) {
+                     handleMessageGroup(ref, consumer, groupConsumer, groupID);
                   }
 
                   handled++;
@@ -2635,6 +2635,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   private int extractGroupSequence(MessageReference ref) {
+      if (internalQueue) {
+         return 0;
+      } else {
+         try {
+            // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
+            return ref.getMessage().getGroupSequence();
+         } catch (Throwable e) {
+            ActiveMQServerLogger.LOGGER.unableToExtractGroupSequence(e);
+            return 0;
+         }
+      }
+   }
+
    protected void refRemoved(MessageReference ref) {
       queueMemorySize.addAndGet(-ref.getMessageMemoryEstimate());
       pendingMetrics.decrementMetrics(ref);
@@ -3110,8 +3124,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             HandleStatus status = handle(ref, consumer);
 
             if (status == HandleStatus.HANDLED) {
-               if (groupID != null && groupConsumer == null && redistributor == null) {
-                  groups.put(groupID, consumer);
+
+               if (redistributor == null) {
+                  handleMessageGroup(ref, consumer, groupConsumer, groupID);
                }
 
                messagesAdded.incrementAndGet();
@@ -3130,6 +3145,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   private void handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
+      if (groupID != null) {
+         if (extractGroupSequence(ref) == -1) {
+            groups.remove(groupID);
+         }
+         if (groupConsumer == null) {
+            groups.put(groupID, consumer);
+         }
+      }
+   }
+
    private void proceedDeliver(Consumer consumer, MessageReference reference) {
       try {
          consumer.proceedDeliver(reference);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/docs/user-manual/en/message-grouping.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/message-grouping.md b/docs/user-manual/en/message-grouping.md
index 0508327..1c4e7ef 100644
--- a/docs/user-manual/en/message-grouping.md
+++ b/docs/user-manual/en/message-grouping.md
@@ -81,6 +81,23 @@ java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialCont
 connectionFactory.myConnectionFactory=tcp://localhost:61616?groupID=Group-0
 ```
 
+
+#### Closing a Message Group
+You generally don't need to close a message group, you just keep using it. 
+
+However if you really do want to close a group you can add a negative sequence number.
+
+Example:
+```java
+Mesasge message = session.createTextMessage("<foo>hey</foo>");
+message.setStringProperty("JMSXGroupID", "Group-0");
+message.setIntProperty("JMSXGroupSeq", -1);
+...
+producer.send(message);
+```
+
+This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer.
+
 ## Example
 
 See the [Message Group Example](examples.md#message-group) which shows how

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f30ca44c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
index 628c814..b0bb8a6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageGroupsTest.java
@@ -21,12 +21,15 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,25 +41,160 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
    private static final int ITERATIONS = 10;
    private static final int MESSAGE_COUNT = 10;
    private static final int MESSAGE_SIZE = 10 * 1024;
-   private static final int RECEIVE_TIMEOUT = 3000;
+   private static final int RECEIVE_TIMEOUT = 1000;
    private static final String JMSX_GROUP_ID = "JmsGroupsTest";
 
+   private ConnectionSupplier AMQPConnection = () -> createConnection();
+   private ConnectionSupplier CoreConnection = () -> createCoreConnection();
+   private ConnectionSupplier OpenWireConnection = () -> createOpenWireConnection();
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsAMQPProducerAMQPConsumer() throws Exception {
+      testMessageGroups(AMQPConnection, AMQPConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsCoreProducerCoreConsumer() throws Exception {
+      testMessageGroups(CoreConnection, CoreConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsCoreProducerAMQPConsumer() throws Exception {
+      testMessageGroups(CoreConnection, AMQPConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsAMQPProducerCoreConsumer() throws Exception {
+      testMessageGroups(AMQPConnection, CoreConnection);
+   }
+
    @Test(timeout = 60000)
-   public void testGroupSeqIsNeverLost() throws Exception {
+   public void testMessageGroupsOpenWireProducerOpenWireConsumer() throws Exception {
+      testMessageGroups(OpenWireConnection, OpenWireConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsCoreProducerOpenWireConsumer() throws Exception {
+      testMessageGroups(CoreConnection, OpenWireConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsOpenWireProducerCoreConsumer() throws Exception {
+      testMessageGroups(OpenWireConnection, CoreConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsAMQPProducerOpenWireConsumer() throws Exception {
+      testMessageGroups(AMQPConnection, OpenWireConnection);
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageGroupsOpenWireProducerAMQPConsumer() throws Exception {
+      testMessageGroups(OpenWireConnection, AMQPConnection);
+   }
+
+
+   public void testMessageGroups(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
+      testGroupSeqIsNeverLost(producerConnectionSupplier, consumerConnectionSupplier);
+      testGroupSeqCloseGroup(producerConnectionSupplier, consumerConnectionSupplier);
+   }
+
+
+   public void testGroupSeqCloseGroup(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
+      final QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(getQueueName()));
+
+      try (Connection producerConnection = producerConnectionSupplier.createConnection();
+           Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+           MessageProducer producer = producerSession.createProducer(producerSession.createQueue(getQueueName()));
+
+           Connection consumerConnection = producerConnectionSupplier.createConnection();
+           Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+           MessageConsumer consumer1 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
+           MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()));
+           MessageConsumer consumer3 = consumerSession.createConsumer(consumerSession.createQueue(getQueueName()))) {
+
+         producerConnection.start();
+         consumerConnection.start();
+
+         //Ensure group and close group, ensuring group is closed
+         sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer1, consumer2, consumer3, queueBinding);
+
+         //Ensure round robin on group to consumer assignment (consumer2 now), then close group again
+         sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer2, consumer3, consumer1, queueBinding);
+
+         //Ensure round robin on group to consumer assignment (consumer3 now), then close group again
+         sendAndConsumeAndThenCloseGroup(producerSession, producer, consumer3, consumer1, consumer1, queueBinding);
+
+
+      }
+   }
+
+   private void sendAndConsumeAndThenCloseGroup(Session producerSession, MessageProducer producer, MessageConsumer expectedGroupConsumer, MessageConsumer consumerA, MessageConsumer consumerB, QueueBinding queueBinding) throws JMSException {
+
+      for (int j = 1; j <= MESSAGE_COUNT; j++) {
+         TextMessage message = producerSession.createTextMessage();
+         message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
+         message.setIntProperty("JMSXGroupSeq", j);
+         message.setText("Message" + j);
+
+         producer.send(message);
+      }
+
+      //Group should have been reset and next consumer chosen, as such all msgs should now go to the second consumer (round robin'd)
+      for (int j = 1; j <= MESSAGE_COUNT; j++) {
+         TextMessage tm = (TextMessage) expectedGroupConsumer.receive(RECEIVE_TIMEOUT);
+         assertNotNull(tm);
+         assertEquals(JMSX_GROUP_ID, tm.getStringProperty("JMSXGroupID"));
+         assertEquals(j, tm.getIntProperty("JMSXGroupSeq"));
+         assertEquals("Message" + j, tm.getText());
+
+         assertNull(consumerA.receiveNoWait());
+         assertNull(consumerB.receiveNoWait());
+      }
+
+      assertEquals(1, queueBinding.getQueue().getGroupCount());
+
+      TextMessage message = producerSession.createTextMessage();
+      message.setStringProperty("JMSXGroupID", JMSX_GROUP_ID);
+      //Close Group using -1 JMSXGroupSeq
+      message.setIntProperty("JMSXGroupSeq", -1);
+      message.setText("Message" + " group close");
+
+      producer.send(message);
+
+      TextMessage receivedGroupCloseMessage = (TextMessage) expectedGroupConsumer.receive(RECEIVE_TIMEOUT);
+      assertNotNull(receivedGroupCloseMessage);
+      assertEquals(JMSX_GROUP_ID, receivedGroupCloseMessage.getStringProperty("JMSXGroupID"));
+      assertEquals(-1, receivedGroupCloseMessage.getIntProperty("JMSXGroupSeq"));
+      assertEquals("group close should goto the existing group consumer", "Message" + " group close", receivedGroupCloseMessage.getText());
+
+      assertNull(consumerA.receiveNoWait());
+      assertNull(consumerB.receiveNoWait());
+
+      assertEquals(0, queueBinding.getQueue().getGroupCount());
+
+   }
+
+
+   public void testGroupSeqIsNeverLost(ConnectionSupplier producerConnectionSupplier, ConnectionSupplier consumerConnectionSupplier) throws Exception {
       AtomicInteger sequenceCounter = new AtomicInteger();
+      AtomicInteger consumedSequenceCounter = new AtomicInteger();
 
       for (int i = 0; i < ITERATIONS; ++i) {
-         Connection connection = createConnection();
-         try {
-            sendMessagesToBroker(connection, MESSAGE_COUNT, sequenceCounter);
-            readMessagesOnBroker(connection, MESSAGE_COUNT);
-         } finally {
-            connection.close();
+         try (Connection producerConnection = producerConnectionSupplier.createConnection();
+              Connection consumerConnection = producerConnectionSupplier.createConnection()) {
+            sendMessagesToBroker(producerConnection, MESSAGE_COUNT, sequenceCounter);
+            readMessagesOnBroker(consumerConnection, MESSAGE_COUNT, consumedSequenceCounter);
          }
       }
    }
 
-   protected void readMessagesOnBroker(Connection connection, int count) throws Exception {
+   protected void readMessagesOnBroker(Connection connection, int count, AtomicInteger sequence) throws Exception {
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Queue queue = session.createQueue(getQueueName());
       MessageConsumer consumer = session.createConsumer(queue);
@@ -66,9 +204,10 @@ public class JMSMessageGroupsTest extends JMSClientTestSupport {
          assertNotNull(message);
          LOG.debug("Read message #{}: type = {}", i, message.getClass().getSimpleName());
          String gid = message.getStringProperty("JMSXGroupID");
-         String seq = message.getStringProperty("JMSXGroupSeq");
+         int seq = message.getIntProperty("JMSXGroupSeq");
          LOG.debug("Message assigned JMSXGroupID := {}", gid);
          LOG.debug("Message assigned JMSXGroupSeq := {}", seq);
+         assertEquals("Sequence order should match", sequence.incrementAndGet(), seq);
       }
 
       session.close();