You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/09/25 16:24:21 UTC
[1/4] git commit: use the message envelopes to track deliveries etc,
remove JmsMessageId class and have the facades use String for
messageid
Repository: qpid-jms
Updated Branches:
refs/heads/master 148e1b9e9 -> 2bd7b1cc4
use the message envelopes to track deliveries etc, remove JmsMessageId class and have the facades use String for messageid
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2bd7b1cc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2bd7b1cc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2bd7b1cc
Branch: refs/heads/master
Commit: 2bd7b1cc4a33f3f5d012ca89c855ae93414e665b
Parents: 2728bf6
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 14:34:10 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsSession.java | 18 ++--
.../jms/message/JmsInboundMessageDispatch.java | 21 +++++
.../org/apache/qpid/jms/message/JmsMessage.java | 27 ++----
.../message/JmsMessagePropertyIntercepter.java | 5 +-
.../jms/message/JmsOutboundMessageDispatch.java | 18 ++++
.../jms/message/facade/JmsMessageFacade.java | 32 ++++---
.../defaults/JmsDefaultMessageFacade.java | 14 ++--
.../org/apache/qpid/jms/meta/JmsMessageId.java | 88 --------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 32 ++++---
.../jms/provider/amqp/AmqpFixedProducer.java | 2 +-
.../amqp/message/AmqpJmsMessageFacade.java | 14 ++--
.../amqp/message/AmqpJmsMessageFacadeTest.java | 12 ++-
12 files changed, 99 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 92aa83a..0c8343c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -67,7 +67,6 @@ import org.apache.qpid.jms.message.JmsMessageFactory;
import org.apache.qpid.jms.message.JmsMessageTransformation;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConsumerId;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
@@ -649,28 +648,22 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
original.setJMSExpiration(timeStamp + timeToLive);
}
- JmsMessageId msgId = null;
+ String msgId = null;
if (!disableMsgId) {
msgId = getNextMessageId(producer);
}
+ original.setJMSMessageID(msgId);
boolean isJmsMessageType = original instanceof JmsMessage;
if (isJmsMessageType) {
((JmsMessage) original).setConnection(connection);
- if (!disableMsgId) {
- ((JmsMessage) original).setJMSMessageID(msgId);
- }
original.setJMSDestination(destination);
}
JmsMessage copy = JmsMessageTransformation.transformMessage(connection, original);
- // Ensure original message gets the destination and message ID as per spec.
+ // Ensure original message gets the destination as per spec.
if (!isJmsMessageType) {
- if (!disableMsgId) {
- original.setJMSMessageID(msgId.toString());
- copy.setJMSMessageID(msgId);
- }
original.setJMSDestination(destination);
copy.setJMSDestination(destination);
}
@@ -684,6 +677,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
envelope.setProducerId(producer.getProducerId());
envelope.setDestination(destination);
envelope.setSendAsync(!sync);
+ envelope.setDispatchId(msgId);
this.connection.send(envelope);
} finally {
@@ -858,8 +852,8 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
return new JmsProducerId(sessionInfo.getSessionId(), producerIdGenerator.incrementAndGet());
}
- private JmsMessageId getNextMessageId(JmsMessageProducer producer) {
- return new JmsMessageId(producer.getProducerId().toString() + "-" + producer.getNextMessageSequence());
+ private String getNextMessageId(JmsMessageProducer producer) {
+ return producer.getProducerId().toString() + "-" + producer.getNextMessageSequence();
}
private <T extends JmsMessage> T init(T message) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index fd8b0b1..46bd4d5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -26,6 +26,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId{
private JmsConsumerId consumerId;
private JmsMessage message;
+ private String dispatchId;
public JmsMessage getMessage() {
return message;
@@ -46,4 +47,24 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId{
public void onMessageRedelivered() {
this.message.incrementRedeliveryCount();
}
+
+ public void setDispatchId(String dispatchId)
+ {
+ this.dispatchId = dispatchId;
+ }
+
+ @Override
+ public String toString() {
+ String result = "JmsInboundMessageDispatch {dispatchId = ";
+ String id = dispatchId;
+ if (id == null) {
+ result = result + "<null>}";
+ } else {
+ result = result + id + "}";
+ }
+
+ return result;
+ }
+
+ //TODO: equals and hashcode?
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index df104de..eb06e21 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -34,7 +34,6 @@ import javax.jms.MessageNotWriteableException;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.util.TypeConversionSupport;
public class JmsMessage implements javax.jms.Message {
@@ -89,12 +88,10 @@ public class JmsMessage implements javax.jms.Message {
}
JmsMessage msg = (JmsMessage) o;
- JmsMessageId oMsg = null;
- JmsMessageId thisMsg = null;
-
- thisMsg = facade.getMessageId();
- oMsg = msg.facade.getMessageId();
+ String oMsg = msg.facade.getMessageId();
+ String thisMsg = facade.getMessageId();
+ //TODO: use super.equals if both id are null?
return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
}
@@ -129,12 +126,7 @@ public class JmsMessage implements javax.jms.Message {
@Override
public String getJMSMessageID() throws JMSException {
- JmsMessageId facadeId = facade.getMessageId();
- if (facadeId == null) {
- return null;
- }
-
- String value = facadeId.getValue();
+ String value = facade.getMessageId();
if (value != null && !value.startsWith(ID_PREFIX)) {
value = ID_PREFIX + value;
}
@@ -144,16 +136,7 @@ public class JmsMessage implements javax.jms.Message {
@Override
public void setJMSMessageID(String value) throws JMSException {
- if (value != null) {
- JmsMessageId id = new JmsMessageId(value);
- facade.setMessageId(id);
- } else {
- facade.setMessageId(null);
- }
- }
-
- public void setJMSMessageID(JmsMessageId messageId) throws JMSException {
- facade.setMessageId(messageId);
+ facade.setMessageId(value);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index 863f420..22116ee 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -29,7 +29,6 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.util.TypeConversionSupport;
/**
@@ -239,7 +238,7 @@ public class JmsMessagePropertyIntercepter {
if (message.getMessageId() == null) {
return null;
}
- return message.getMessageId().toString();
+ return message.getMessageId();
}
@Override
@@ -248,7 +247,7 @@ public class JmsMessagePropertyIntercepter {
if (rc == null) {
throw new JMSException("Property JMSMessageID cannot be set from a " + value.getClass().getName() + ".");
}
- message.setMessageId(new JmsMessageId(rc));
+ message.setMessageId(rc);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
index c77739b..44e211f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
@@ -28,6 +28,7 @@ public class JmsOutboundMessageDispatch {
private JmsMessage message;
private JmsDestination destination;
private boolean sendAsync;
+ private String dispatchId;
public JmsDestination getDestination() {
return destination;
@@ -60,4 +61,21 @@ public class JmsOutboundMessageDispatch {
public boolean isSendAsync() {
return sendAsync;
}
+
+ public void setDispatchId(String dispatchId) {
+ this.dispatchId = dispatchId;
+ }
+
+ @Override
+ public String toString() {
+ String result = "JmsOutboundMessageDispatch {dispatchId = ";
+ String id = dispatchId;
+ if (id == null) {
+ result = result + "<null>}";
+ } else {
+ result = result + id + "}";
+ }
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index c8ab9ce..c2d9e8a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -21,7 +21,6 @@ import java.util.Map;
import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
-import org.apache.qpid.jms.meta.JmsMessageId;
/**
* The Message Facade interface defines the required mapping between a Provider's
@@ -114,21 +113,6 @@ public interface JmsMessageFacade {
JmsMessageFacade copy() throws JMSException;
/**
- * Return the internal message Id as a JmsMessageId wrapped value.
- *
- * @return a JmsMessageId that wraps the internal message Id.
- */
- JmsMessageId getMessageId();
-
- /**
- * Updates the message Id using the value of the given JmsMessageId.
- *
- * @param messageId
- * the new JmsMessageId value to assign as the message Id.
- */
- void setMessageId(JmsMessageId messageId);
-
- /**
* Gets the timestamp assigned to the message when it was sent.
*
* @return the message timestamp value.
@@ -180,6 +164,21 @@ public interface JmsMessageFacade {
void setCorrelationIdBytes(byte[] correlationId);
/**
+ * Returns the message ID set on this message if one exists, null otherwise.
+ *
+ * @return the set message ID or null if not set.
+ */
+ String getMessageId();
+
+ /**
+ * Sets the message ID for this message.
+ *
+ * @param messageId
+ * The message ID to set on this message, or null to clear.
+ */
+ void setMessageId(String messageId);
+
+ /**
* @return true if this message is tagged as being persistent.
*/
boolean isPersistent();
@@ -353,5 +352,4 @@ public interface JmsMessageFacade {
* the group sequence value to assign this message.
*/
void setGroupSequence(int groupSequence);
-
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
index 8a501cc..32e501a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
@@ -26,7 +26,6 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.fusesource.hawtbuf.AsciiBuffer;
/**
@@ -60,7 +59,7 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade {
protected byte priority = javax.jms.Message.DEFAULT_PRIORITY;
protected String groupId;
protected int groupSequence;
- protected JmsMessageId messageId;
+ protected String messageId;
protected long expiration;
protected long timestamp;
protected String correlationId;
@@ -95,10 +94,7 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade {
target.destination = this.destination;
target.replyTo = this.replyTo;
target.userId = this.userId;
-
- if (this.messageId != null) {
- target.messageId = this.messageId.copy();
- }
+ target.messageId = this.messageId;
if (this.properties != null) {
target.properties = new HashMap<String, Object>(this.properties);
@@ -147,12 +143,12 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade {
}
@Override
- public JmsMessageId getMessageId() {
- return this.messageId;
+ public String getMessageId() {
+ return messageId;
}
@Override
- public void setMessageId(JmsMessageId messageId) {
+ public void setMessageId(String messageId) {
this.messageId = messageId;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
deleted file mode 100644
index 9a75242..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.meta;
-
-
-/**
- * JMS Message Id class used to uniquely identify messages for the JMS Framework.
- */
-public class JmsMessageId extends JmsAbstractResourceId implements Comparable<JmsMessageId> {
-
- protected String messageId;
-
- public JmsMessageId(String messageId) {
- this.messageId = messageId;
- }
-
- public JmsMessageId copy() {
- JmsMessageId copy = new JmsMessageId(messageId);
- return copy;
- }
-
- /**
- * @return the set message ID value.
- */
- public String getValue() {
- return messageId;
- }
-
- @Override
- public boolean equals(Object o) {
- //TODO: handle messages with no messageId value
- if (this == o) {
- return true;
- }
- if (o == null || o.getClass() != getClass()) {
- return false;
- }
-
- JmsMessageId id = (JmsMessageId) o;
- return id.messageId.equals(this.messageId);
- }
-
- @Override
- public int hashCode() {
- //TODO: handle messages with no messageId value
- if (hashCode == 0) {
- hashCode = messageId.hashCode();
- }
- return hashCode;
- }
-
- @Override
- public int compareTo(JmsMessageId other) {
- //TODO: handle messages with no messageId value
- int result = -1;
- if (other != null) {
- result = this.toString().compareTo(other.toString());
- }
- return result;
- }
-
- @Override
- public String toString() {
- String result = "JmsMessageId{messageId = ";
- Object id = messageId;
- if (id == null) {
- result = result + "<null>}";
- } else {
- result = result + String.valueOf(id) + "}";
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 1f51b46..d827287 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -28,7 +28,6 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderListener;
@@ -69,7 +68,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
protected final AmqpSession session;
protected final InboundTransformer inboundTransformer =
new JMSMappingInboundTransformer(AmqpJMSVendor.INSTANCE);;
- protected final Map<JmsMessageId, Delivery> delivered = new LinkedHashMap<JmsMessageId, Delivery>();
+ protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
protected boolean presettle;
private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream();
@@ -188,21 +187,20 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
* @throws JMSException if an error occurs accessing the Message properties.
*/
public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
- JmsMessageId messageId = envelope.getMessage().getFacade().getMessageId();
Delivery delivery = null;
- if (messageId.getProviderHint() instanceof Delivery) {
- delivery = (Delivery) messageId.getProviderHint();
+ if (envelope.getProviderHint() instanceof Delivery) {
+ delivery = (Delivery) envelope.getProviderHint();
} else {
- delivery = delivered.get(messageId);
+ delivery = delivered.get(envelope);
if (delivery == null) {
- LOG.warn("Received Ack for unknown message: {}", messageId);
+ LOG.warn("Received Ack for unknown message: {}", envelope);
return;
}
}
if (ackType.equals(ACK_TYPE.DELIVERED)) {
- LOG.debug("Delivered Ack of message: {}", messageId);
+ LOG.debug("Delivered Ack of message: {}", envelope);
if (session.isTransacted()) {
Binary txnId = session.getTransactionContext().getAmqpTransactionId();
if (txnId != null) {
@@ -214,16 +212,16 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
}
}
if (!isPresettle()) {
- delivered.put(messageId, delivery);
+ delivered.put(envelope, delivery);
}
sendFlowIfNeeded();
} else if (ackType.equals(ACK_TYPE.CONSUMED)) {
// A Consumer may not always send a delivered ACK so we need to check to
// ensure we don't add to much credit to the link.
- if (isPresettle() || delivered.remove(messageId) == null) {
+ if (isPresettle() || delivered.remove(envelope) == null) {
sendFlowIfNeeded();
}
- LOG.debug("Consumed Ack of message: {}", messageId);
+ LOG.debug("Consumed Ack of message: {}", envelope);
if (!delivery.isSettled()) {
delivery.disposition(Accepted.getInstance());
delivery.settle();
@@ -237,7 +235,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
} else if (ackType.equals(ACK_TYPE.POISONED)) {
deliveryFailed(delivery, false);
} else {
- LOG.warn("Unsupporeted Ack Type for message: {}", messageId);
+ LOG.warn("Unsupported Ack Type for message: {}", envelope);
}
}
@@ -332,9 +330,6 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
return;
}
- // Store link to delivery in the hint for use in acknowledge requests.
- message.getFacade().getMessageId().setProviderHint(incoming);
-
// We need to signal to the create message that it's being dispatched and for now
// the transformer creates the message in write mode, onSend will reset it to read
// mode and the consumer will see it as a normal received message.
@@ -343,7 +338,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch();
envelope.setMessage(message);
envelope.setConsumerId(info.getConsumerId());
+ // Store link to delivery in the hint for use in acknowledge requests.
envelope.setProviderHint(incoming);
+ //TODO: the below messageId retrieval may result in type conversion costs
+ envelope.setDispatchId(message.getJMSMessageID());
// Store reference to envelope in delivery context for recovery
incoming.setContext(envelope);
@@ -408,13 +406,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
ProviderListener listener = session.getProvider().getProviderListener();
if (listener != null) {
if (envelope.getMessage() != null) {
- LOG.debug("Dispatching received message: {}", envelope.getMessage().getFacade().getMessageId());
+ LOG.debug("Dispatching received message: {}", envelope);
} else {
LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId());
}
listener.onMessage(envelope);
} else {
- LOG.error("Provider listener is not set, message will be dropped.");
+ LOG.error("Provider listener is not set, message will be dropped: {}", envelope);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index a36e419..dae51f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -109,7 +109,7 @@ public class AmqpFixedProducer extends AmqpProducer {
private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
JmsMessageFacade facade = envelope.getMessage().getFacade();
- LOG.trace("Producer sending message: {}", envelope.getMessage().getFacade().getMessageId());
+ LOG.trace("Producer sending message: {}", envelope);
byte[] tag = tagGenerator.getNextTag();
Delivery delivery = null;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index d0ee5af..8afb6db 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -35,7 +35,6 @@ import javax.jms.MessageFormatException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.exceptions.IdConversionException;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.Proton;
@@ -288,31 +287,30 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
}
@Override
- public JmsMessageId getMessageId() {
+ public String getMessageId() {
Object underlying = message.getMessageId();
AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE;
String baseStringId = helper.toBaseMessageIdString(underlying);
// Ensure the ID: prefix is present.
- // TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix.
+ // TODO: should we always do this when non-null? AMQP JMS Mapping says never to send the "ID:" prefix.
// TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities?
// I Ended up putting it in JmsMessage after the above comment, as a workaround for the
// current JmsDefaultMessageFacade usage.
if (baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) {
baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId;
}
- return new JmsMessageId(baseStringId);
+ return baseStringId;
}
@Override
- public void setMessageId(JmsMessageId messageId) {
+ public void setMessageId(String messageId) {
if (messageId == null) {
message.setMessageId(null);
} else {
- String value = messageId.getValue();
// Remove the first 'ID:' prefix if present
- value = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(value);
- message.setMessageId(value);
+ String stripped = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(messageId);
+ message.setMessageId(stripped);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index c1200fb..5a14699 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -32,7 +32,6 @@ import java.util.UUID;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsTopic;
-import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.Proton;
@@ -251,7 +250,7 @@ public class AmqpJmsMessageFacadeTest {
public void testGetMessageIdIsNullOnNewMessage() {
AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();
- assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId().getValue());
+ assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId());
}
/**
@@ -263,11 +262,10 @@ public class AmqpJmsMessageFacadeTest {
AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();
- JmsMessageId jmsMessageId = new JmsMessageId(testMessageId);
- amqpMessageFacade.setMessageId(jmsMessageId);
+ amqpMessageFacade.setMessageId(testMessageId);
- assertEquals("Expected messageId object not returned", jmsMessageId, amqpMessageFacade.getMessageId());
- assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId().getValue());
+ assertEquals("Expected messageId not returned", testMessageId, amqpMessageFacade.getMessageId());
+ assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId());
}
/**
@@ -326,7 +324,7 @@ public class AmqpJmsMessageFacadeTest {
String expectedString = appendIdAndTypePrefix(testMessageId);
- assertEquals("Incorrect messageId value received", new JmsMessageId(expectedString), amqpMessageFacade.getMessageId());
+ assertEquals("Incorrect messageId value received", expectedString, amqpMessageFacade.getMessageId());
}
private String appendIdAndTypePrefix(Object testMessageId) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] git commit: small refactor for clarity,
add note about exception handling
Posted by ro...@apache.org.
small refactor for clarity, add note about exception handling
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1af530ad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1af530ad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1af530ad
Branch: refs/heads/master
Commit: 1af530adab2fc8e8d2ca5364aac880ab4bed4ef4
Parents: 148e1b9
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 11:14:00 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsMessageConsumer.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1af530ad/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 07cba2a..129b1e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -249,9 +249,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
if (message.getAcknowledgeCallback() != null || session.isTransacted()) {
// Message has been received by the app.. expand the credit
// window so that we receive more messages.
- session.acknowledge(envelope, ACK_TYPE.DELIVERED);
+ doAckDelivered(envelope);
} else {
- doAck(envelope);
+ doAckConsumed(envelope);
}
// Tags that we have delivered and can't close if in a TX Session.
delivered.set(true);
@@ -259,7 +259,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
return envelope;
}
- private void doAck(final JmsInboundMessageDispatch envelope) throws JMSException {
+ private void doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException {
checkClosed();
try {
session.acknowledge(envelope, ACK_TYPE.CONSUMED);
@@ -269,6 +269,11 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
}
}
+ private void doAckDelivered(final JmsInboundMessageDispatch envelope) throws JMSException {
+ // TODO: this can also throw, so should we handle it the same as doAckConsumed above?
+ session.acknowledge(envelope, ACK_TYPE.DELIVERED);
+ }
+
/**
* Called from the session when a new Message has been dispatched to this Consumer
* from the connection.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] git commit: make the inbound messsage envelope part of the Id
heirachy
Posted by ro...@apache.org.
make the inbound messsage envelope part of the Id heirachy
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2728bf61
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2728bf61
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2728bf61
Branch: refs/heads/master
Commit: 2728bf610d3dc74d23034f6b54813eccb78e81f1
Parents: 11e9f52
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 11:27:15 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100
----------------------------------------------------------------------
.../qpid/jms/message/JmsInboundMessageDispatch.java | 12 ++----------
1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2728bf61/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index e651060..fd8b0b1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -16,16 +16,16 @@
*/
package org.apache.qpid.jms.message;
+import org.apache.qpid.jms.meta.JmsAbstractResourceId;
import org.apache.qpid.jms.meta.JmsConsumerId;
/**
* Envelope used to deliver incoming messages to their targeted consumer.
*/
-public class JmsInboundMessageDispatch {
+public class JmsInboundMessageDispatch extends JmsAbstractResourceId{
private JmsConsumerId consumerId;
private JmsMessage message;
- private Object providerHint;
public JmsMessage getMessage() {
return message;
@@ -43,14 +43,6 @@ public class JmsInboundMessageDispatch {
this.consumerId = consumerId;
}
- public Object getProviderHint() {
- return this.providerHint;
- }
-
- public void setProviderHint(Object hint) {
- this.providerHint = hint;
- }
-
public void onMessageRedelivered() {
this.message.incrementRedeliveryCount();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] git commit: reduce JmsMessageId to a single constructor
Posted by ro...@apache.org.
reduce JmsMessageId to a single constructor
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/11e9f52c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/11e9f52c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/11e9f52c
Branch: refs/heads/master
Commit: 11e9f52c42dec0fe371bc4b9ea239413049e33d5
Parents: 1af530a
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 11:26:16 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/qpid/jms/JmsSession.java | 2 +-
.../java/org/apache/qpid/jms/meta/JmsMessageId.java | 12 ------------
2 files changed, 1 insertion(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11e9f52c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a7ede86..92aa83a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -859,7 +859,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
}
private JmsMessageId getNextMessageId(JmsMessageProducer producer) {
- return new JmsMessageId(producer.getProducerId(), producer.getNextMessageSequence());
+ return new JmsMessageId(producer.getProducerId().toString() + "-" + producer.getNextMessageSequence());
}
private <T extends JmsMessage> T init(T message) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11e9f52c/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
index 1698231..9a75242 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
@@ -24,18 +24,6 @@ public class JmsMessageId extends JmsAbstractResourceId implements Comparable<Jm
protected String messageId;
- public JmsMessageId(JmsProducerInfo producerInfo, long producerSequenceId) {
- this(producerInfo.getProducerId(), producerSequenceId);
- }
-
- public JmsMessageId(JmsProducerId producerId, long producerSequenceId) {
- this(producerId.toString(), producerSequenceId);
- }
-
- public JmsMessageId(String producerId, long producerSequenceId) {
- this(producerId + "-" + producerSequenceId);
- }
-
public JmsMessageId(String messageId) {
this.messageId = messageId;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org