You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/09 14:13:01 UTC
[2/3] activemq-artemis git commit: ARTEMIS-1009 Tight up on AMQP
ARTEMIS-1009 Tight up on AMQP
avoiding application properties even further
fixing nolocal
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c3690843
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c3690843
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c3690843
Branch: refs/heads/master
Commit: c36908438178e193392c1639067d281ce895b914
Parents: 8aa55a6
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 8 16:11:35 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Mar 9 08:12:42 2017 -0600
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 19 ++++++++-
.../artemis/core/message/impl/CoreMessage.java | 7 +++-
.../protocol/amqp/broker/AMQPMessage.java | 42 ++++++++++++++++----
.../amqp/broker/AMQPSessionCallback.java | 11 +----
.../amqp/proton/ProtonServerSenderContext.java | 6 +--
.../openwire/OpenWireMessageConverter.java | 3 --
.../core/protocol/openwire/OpenwireMessage.java | 2 +-
.../core/postoffice/impl/BindingsImpl.java | 28 ++++++-------
.../core/postoffice/impl/PostOfficeImpl.java | 6 +--
.../core/server/cluster/impl/Redistributor.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 2 +-
.../integration/client/AcknowledgeTest.java | 2 +-
13 files changed, 84 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index c7a831b..5fd0778 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -167,7 +167,9 @@ public interface Message {
default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) {
Object obj = getDeliveryAnnotationProperty(property);
- if (obj instanceof SimpleString) {
+ if (obj == null) {
+ return null;
+ } else if (obj instanceof SimpleString) {
return (SimpleString)obj;
} else {
return SimpleString.toSimpleString(obj.toString());
@@ -232,6 +234,10 @@ public interface Message {
* */
RefCountMessageListener getContext();
+ default SimpleString getGroupID() {
+ return null;
+ }
+
SimpleString getReplyTo();
Message setReplyTo(SimpleString address);
@@ -256,6 +262,15 @@ public interface Message {
*/
long getMessageID();
+ // used for NO-LOCAL: mainly for AMQP
+ default Message setConnectionID(String connectionID) {
+ return this;
+ }
+
+ default String getConnectionID() {
+ return null;
+ }
+
Message setMessageID(long id);
default boolean isLargeMessage() {
@@ -503,7 +518,7 @@ public interface Message {
Object getObjectProperty(SimpleString key);
- Object removeDeliveryAnnoationProperty(SimpleString key);
+ Object removeDeliveryAnnotationProperty(SimpleString key);
Object getDeliveryAnnotationProperty(SimpleString key);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index e8d7cc5..8d15384 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -104,7 +104,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
/** On core there's no delivery annotation */
@Override
- public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ public Object removeDeliveryAnnotationProperty(SimpleString key) {
return removeProperty(key);
}
@@ -194,6 +194,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
}
+ @Override
+ public SimpleString getGroupID() {
+ return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
+ }
+
/**
*
* @param sendBuffer
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 1859dcf..24175d2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -72,6 +73,7 @@ public class AMQPMessage extends RefCountMessage {
private Properties _properties;
private ApplicationProperties applicationProperties;
private long scheduledTime = -1;
+ private String connectionID;
public AMQPMessage(long messageFormat, byte[] data) {
this.data = Unpooled.wrappedBuffer(data);
@@ -128,7 +130,6 @@ public class AMQPMessage extends RefCountMessage {
}
private Map getApplicationPropertiesMap() {
-
ApplicationProperties appMap = getApplicationProperties();
Map map = null;
@@ -158,6 +159,17 @@ public class AMQPMessage extends RefCountMessage {
parsedHeaders = true;
}
}
+ @Override
+ public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
+ this.connectionID = connectionID;
+ return this;
+ }
+
+ @Override
+ public String getConnectionID() {
+ return connectionID;
+ }
+
public MessageAnnotations getMessageAnnotations() {
parseHeaders();
@@ -222,6 +234,17 @@ public class AMQPMessage extends RefCountMessage {
}
+ @Override
+ public SimpleString getGroupID() {
+ parseHeaders();
+
+ if (_properties != null && _properties.getGroupId() != null) {
+ return SimpleString.toSimpleString(_properties.getGroupId());
+ } else {
+ return null;
+ }
+ }
+
@Override
public Long getScheduledDeliveryTime() {
@@ -667,11 +690,13 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object getObjectProperty(String key) {
- if (key.equals("JMSType")) {
+ if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
return getProperties().getSubject();
+ } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
+ return getConnectionID();
+ } else {
+ return getApplicationPropertiesMap().get(key);
}
-
- return getApplicationPropertiesMap().get(key);
}
@Override
@@ -686,10 +711,13 @@ public class AMQPMessage extends RefCountMessage {
@Override
public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
- if (key.equals("JMSType")) {
+ if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
return getProperties().getSubject();
+ } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
+ return getConnectionID();
+ } else {
+ return (String)getApplicationPropertiesMap().get(key);
}
- return (String)getApplicationPropertiesMap().get(key);
}
@Override
@@ -702,7 +730,7 @@ public class AMQPMessage extends RefCountMessage {
}
@Override
- public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ public Object removeDeliveryAnnotationProperty(SimpleString key) {
parseHeaders();
if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
return null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index a079190..fb355ff 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -394,10 +393,10 @@ public class AMQPSessionCallback implements SessionCallback {
final Receiver receiver) throws Exception {
try {
-// message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
+ message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
+
serverSession.send(transaction, message, false, false);
- // FIXME Potential race here...
manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
@@ -421,10 +420,6 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
- public String getPubSubPrefix() {
- return manager.getPubSubPrefix();
- }
-
public void offerProducerCredit(final String address,
final int credits,
final int threshold,
@@ -482,8 +477,6 @@ public class AMQPSessionCallback implements SessionCallback {
@Override
public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
- message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
-
ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fcbd47e..a2a8eda 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
@@ -44,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResource
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
+import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.qpid.proton.amqp.DescribedType;
@@ -193,7 +193,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean noLocal = false;
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
- String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+ String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector.endsWith(noLocalFilter)) {
if (selector.length() > noLocalFilter.length()) {
@@ -283,7 +283,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
if (filter != null) {
String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
- String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+ String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
if (selector != null) {
selector += " AND " + noLocalFilter;
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3808363..f578ac8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -55,7 +55,6 @@ import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
@@ -374,8 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
}
- ConsumerId consumerId = messageSend.getTargetConsumerId();
-
String userId = messageSend.getUserID();
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 5793d58..2504a71 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -63,7 +63,7 @@ public class OpenwireMessage implements Message {
}
@Override
- public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ public Object removeDeliveryAnnotationProperty(SimpleString key) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2de97e2..c7b6024 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -238,19 +238,17 @@ public final class BindingsImpl implements Bindings {
/* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/
- if (message.containsProperty(Message.HDR_SCALEDOWN_TO_IDS)) {
- byte[] ids = (byte[]) message.removeProperty(Message.HDR_SCALEDOWN_TO_IDS);
-
- if (ids != null) {
- ByteBuffer buffer = ByteBuffer.wrap(ids);
- while (buffer.hasRemaining()) {
- long id = buffer.getLong();
- for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) {
- if (entry.getValue() instanceof RemoteQueueBinding) {
- RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
- if (remoteQueueBinding.getRemoteQueueID() == id) {
- message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
- }
+ byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS);
+
+ if (ids != null) {
+ ByteBuffer buffer = ByteBuffer.wrap(ids);
+ while (buffer.hasRemaining()) {
+ long id = buffer.getLong();
+ for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) {
+ if (entry.getValue() instanceof RemoteQueueBinding) {
+ RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
+ if (remoteQueueBinding.getRemoteQueueID() == id) {
+ message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
}
}
}
@@ -270,10 +268,10 @@ public final class BindingsImpl implements Bindings {
if (!routed) {
// Remove the ids now, in order to avoid double check
- byte[] ids = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_IDS);
+ ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS);
// Fetch the groupId now, in order to avoid double checking
- SimpleString groupId = message.getSimpleStringProperty(Message.HDR_GROUP_ID);
+ SimpleString groupId = message.getGroupID();
if (ids != null) {
routeFromCluster(message, context, ids);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 464859f..e2e7b24 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -820,8 +820,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
MessageReference reference = MessageReference.Factory.createReference(message, queue);
- if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
- Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ Long scheduledDeliveryTime = message.getScheduledDeliveryTime();
+ if (scheduledDeliveryTime != null) {
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
@@ -1220,7 +1220,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
AtomicBoolean startedTX) throws Exception {
// Check the DuplicateCache for the Bridge first
- Object bridgeDup = message.removeDeliveryAnnoationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
+ Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
if (bridgeDup != null) {
// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
byte[] bridgeDupBytes = (byte[]) bridgeDup;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index e9477a8..26399dc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -131,7 +131,7 @@ public class Redistributor implements Consumer {
public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
if (!active) {
return HandleStatus.BUSY;
- } else if (reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID) != null) {
+ } else if (reference.getMessage().getGroupID() != null) {
//we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered
return HandleStatus.NO_MATCH;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f0f8e97..fa111eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2162,7 +2162,7 @@ public class QueueImpl implements Queue {
} else {
try {
// But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
- return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+ return ref.getMessage().getGroupID();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
return null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 1569822..2a604e7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -304,7 +304,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ public Object removeDeliveryAnnotationProperty(SimpleString key) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index d93807f..7181c61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -362,7 +362,7 @@ public class AcknowledgeTest extends ActiveMQTestBase {
}
@Override
- public Object removeDeliveryAnnoationProperty(SimpleString key) {
+ public Object removeDeliveryAnnotationProperty(SimpleString key) {
return null;
}