You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/09/25 16:24:21 UTC

[1/4] git commit: use the message envelopes to track deliveries etc, remove JmsMessageId class and have the facades use String for messageid

Repository: qpid-jms
Updated Branches:
  refs/heads/master 148e1b9e9 -> 2bd7b1cc4


use the message envelopes to track deliveries etc, remove JmsMessageId class and have the facades use String for messageid


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2bd7b1cc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2bd7b1cc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2bd7b1cc

Branch: refs/heads/master
Commit: 2bd7b1cc4a33f3f5d012ca89c855ae93414e665b
Parents: 2728bf6
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 14:34:10 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    | 18 ++--
 .../jms/message/JmsInboundMessageDispatch.java  | 21 +++++
 .../org/apache/qpid/jms/message/JmsMessage.java | 27 ++----
 .../message/JmsMessagePropertyIntercepter.java  |  5 +-
 .../jms/message/JmsOutboundMessageDispatch.java | 18 ++++
 .../jms/message/facade/JmsMessageFacade.java    | 32 ++++---
 .../defaults/JmsDefaultMessageFacade.java       | 14 ++--
 .../org/apache/qpid/jms/meta/JmsMessageId.java  | 88 --------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 32 ++++---
 .../jms/provider/amqp/AmqpFixedProducer.java    |  2 +-
 .../amqp/message/AmqpJmsMessageFacade.java      | 14 ++--
 .../amqp/message/AmqpJmsMessageFacadeTest.java  | 12 ++-
 12 files changed, 99 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 92aa83a..0c8343c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -67,7 +67,6 @@ import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsMessageTransformation;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConsumerId;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.meta.JmsProducerId;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
@@ -649,28 +648,22 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
                 original.setJMSExpiration(timeStamp + timeToLive);
             }
 
-            JmsMessageId msgId = null;
+            String msgId = null;
             if (!disableMsgId) {
                 msgId = getNextMessageId(producer);
             }
+            original.setJMSMessageID(msgId);
 
             boolean isJmsMessageType = original instanceof JmsMessage;
             if (isJmsMessageType) {
                 ((JmsMessage) original).setConnection(connection);
-                if (!disableMsgId) {
-                    ((JmsMessage) original).setJMSMessageID(msgId);
-                }
                 original.setJMSDestination(destination);
             }
 
             JmsMessage copy = JmsMessageTransformation.transformMessage(connection, original);
 
-            // Ensure original message gets the destination and message ID as per spec.
+            // Ensure original message gets the destination as per spec.
             if (!isJmsMessageType) {
-                if (!disableMsgId) {
-                    original.setJMSMessageID(msgId.toString());
-                    copy.setJMSMessageID(msgId);
-                }
                 original.setJMSDestination(destination);
                 copy.setJMSDestination(destination);
             }
@@ -684,6 +677,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
             envelope.setProducerId(producer.getProducerId());
             envelope.setDestination(destination);
             envelope.setSendAsync(!sync);
+            envelope.setDispatchId(msgId);
 
             this.connection.send(envelope);
         } finally {
@@ -858,8 +852,8 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
         return new JmsProducerId(sessionInfo.getSessionId(), producerIdGenerator.incrementAndGet());
     }
 
-    private JmsMessageId getNextMessageId(JmsMessageProducer producer) {
-        return new JmsMessageId(producer.getProducerId().toString() + "-" + producer.getNextMessageSequence());
+    private String getNextMessageId(JmsMessageProducer producer) {
+        return producer.getProducerId().toString() + "-" + producer.getNextMessageSequence();
     }
 
     private <T extends JmsMessage> T init(T message) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index fd8b0b1..46bd4d5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -26,6 +26,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId{
 
     private JmsConsumerId consumerId;
     private JmsMessage message;
+    private String dispatchId;
 
     public JmsMessage getMessage() {
         return message;
@@ -46,4 +47,24 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId{
     public void onMessageRedelivered() {
         this.message.incrementRedeliveryCount();
     }
+
+    public void setDispatchId(String dispatchId)
+    {
+        this.dispatchId = dispatchId;
+    }
+
+    @Override
+    public String toString() {
+        String result = "JmsInboundMessageDispatch {dispatchId = ";
+        String id = dispatchId;
+        if (id == null) {
+            result = result + "<null>}";
+        } else {
+            result = result + id + "}";
+        }
+
+        return result;
+    }
+
+    //TODO: equals and hashcode?
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index df104de..eb06e21 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -34,7 +34,6 @@ import javax.jms.MessageNotWriteableException;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.util.TypeConversionSupport;
 
 public class JmsMessage implements javax.jms.Message {
@@ -89,12 +88,10 @@ public class JmsMessage implements javax.jms.Message {
         }
 
         JmsMessage msg = (JmsMessage) o;
-        JmsMessageId oMsg = null;
-        JmsMessageId thisMsg = null;
-
-        thisMsg = facade.getMessageId();
-        oMsg = msg.facade.getMessageId();
+        String oMsg = msg.facade.getMessageId();
+        String thisMsg = facade.getMessageId();
 
+        //TODO: use super.equals if both id are null?
         return thisMsg != null && oMsg != null && oMsg.equals(thisMsg);
     }
 
@@ -129,12 +126,7 @@ public class JmsMessage implements javax.jms.Message {
 
     @Override
     public String getJMSMessageID() throws JMSException {
-        JmsMessageId facadeId = facade.getMessageId();
-        if (facadeId == null) {
-            return null;
-        }
-
-        String value = facadeId.getValue();
+        String value = facade.getMessageId();
         if (value != null && !value.startsWith(ID_PREFIX)) {
             value = ID_PREFIX + value;
         }
@@ -144,16 +136,7 @@ public class JmsMessage implements javax.jms.Message {
 
     @Override
     public void setJMSMessageID(String value) throws JMSException {
-        if (value != null) {
-            JmsMessageId id = new JmsMessageId(value);
-            facade.setMessageId(id);
-        } else {
-            facade.setMessageId(null);
-        }
-    }
-
-    public void setJMSMessageID(JmsMessageId messageId) throws JMSException {
-        facade.setMessageId(messageId);
+        facade.setMessageId(value);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index 863f420..22116ee 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -29,7 +29,6 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.util.TypeConversionSupport;
 
 /**
@@ -239,7 +238,7 @@ public class JmsMessagePropertyIntercepter {
                 if (message.getMessageId() == null) {
                     return null;
                 }
-                return message.getMessageId().toString();
+                return message.getMessageId();
             }
 
             @Override
@@ -248,7 +247,7 @@ public class JmsMessagePropertyIntercepter {
                 if (rc == null) {
                     throw new JMSException("Property JMSMessageID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setMessageId(new JmsMessageId(rc));
+                message.setMessageId(rc);
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
index c77739b..44e211f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsOutboundMessageDispatch.java
@@ -28,6 +28,7 @@ public class JmsOutboundMessageDispatch {
     private JmsMessage message;
     private JmsDestination destination;
     private boolean sendAsync;
+    private String dispatchId;
 
     public JmsDestination getDestination() {
         return destination;
@@ -60,4 +61,21 @@ public class JmsOutboundMessageDispatch {
     public boolean isSendAsync() {
         return sendAsync;
     }
+
+    public void setDispatchId(String dispatchId) {
+        this.dispatchId = dispatchId;
+    }
+
+    @Override
+    public String toString() {
+        String result = "JmsOutboundMessageDispatch {dispatchId = ";
+        String id = dispatchId;
+        if (id == null) {
+            result = result + "<null>}";
+        } else {
+            result = result + id + "}";
+        }
+
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index c8ab9ce..c2d9e8a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -21,7 +21,6 @@ import java.util.Map;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsDestination;
-import org.apache.qpid.jms.meta.JmsMessageId;
 
 /**
  * The Message Facade interface defines the required mapping between a Provider's
@@ -114,21 +113,6 @@ public interface JmsMessageFacade {
     JmsMessageFacade copy() throws JMSException;
 
     /**
-     * Return the internal message Id as a JmsMessageId wrapped value.
-     *
-     * @return a JmsMessageId that wraps the internal message Id.
-     */
-    JmsMessageId getMessageId();
-
-    /**
-     * Updates the message Id using the value of the given JmsMessageId.
-     *
-     * @param messageId
-     *        the new JmsMessageId value to assign as the message Id.
-     */
-    void setMessageId(JmsMessageId messageId);
-
-    /**
      * Gets the timestamp assigned to the message when it was sent.
      *
      * @return the message timestamp value.
@@ -180,6 +164,21 @@ public interface JmsMessageFacade {
     void setCorrelationIdBytes(byte[] correlationId);
 
     /**
+     * Returns the message ID set on this message if one exists, null otherwise.
+     *
+     * @return the set message ID or null if not set.
+     */
+    String getMessageId();
+
+    /**
+     * Sets the message ID for this message.
+     *
+     * @param messageId
+     *        The message ID to set on this message, or null to clear.
+     */
+    void setMessageId(String messageId);
+
+    /**
      * @return true if this message is tagged as being persistent.
      */
     boolean isPersistent();
@@ -353,5 +352,4 @@ public interface JmsMessageFacade {
      *        the group sequence value to assign this message.
      */
     void setGroupSequence(int groupSequence);
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
index 8a501cc..32e501a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/defaults/JmsDefaultMessageFacade.java
@@ -26,7 +26,6 @@ import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.fusesource.hawtbuf.AsciiBuffer;
 
 /**
@@ -60,7 +59,7 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade {
     protected byte priority = javax.jms.Message.DEFAULT_PRIORITY;
     protected String groupId;
     protected int groupSequence;
-    protected JmsMessageId messageId;
+    protected String messageId;
     protected long expiration;
     protected long timestamp;
     protected String correlationId;
@@ -95,10 +94,7 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade {
         target.destination = this.destination;
         target.replyTo = this.replyTo;
         target.userId = this.userId;
-
-        if (this.messageId != null) {
-            target.messageId = this.messageId.copy();
-        }
+        target.messageId = this.messageId;
 
         if (this.properties != null) {
             target.properties = new HashMap<String, Object>(this.properties);
@@ -147,12 +143,12 @@ public class JmsDefaultMessageFacade implements JmsMessageFacade {
     }
 
     @Override
-    public JmsMessageId getMessageId() {
-        return this.messageId;
+    public String getMessageId() {
+        return messageId;
     }
 
     @Override
-    public void setMessageId(JmsMessageId messageId) {
+    public void setMessageId(String messageId) {
         this.messageId = messageId;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
deleted file mode 100644
index 9a75242..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.meta;
-
-
-/**
- * JMS Message Id class used to uniquely identify messages for the JMS Framework.
- */
-public class JmsMessageId extends JmsAbstractResourceId implements Comparable<JmsMessageId> {
-
-    protected String messageId;
-
-    public JmsMessageId(String messageId) {
-        this.messageId = messageId;
-    }
-
-    public JmsMessageId copy() {
-        JmsMessageId copy = new JmsMessageId(messageId);
-        return copy;
-    }
-
-    /**
-     * @return the set message ID value.
-     */
-    public String getValue() {
-        return messageId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        //TODO: handle messages with no messageId value
-        if (this == o) {
-            return true;
-        }
-        if (o == null || o.getClass() != getClass()) {
-            return false;
-        }
-
-        JmsMessageId id = (JmsMessageId) o;
-        return id.messageId.equals(this.messageId);
-    }
-
-    @Override
-    public int hashCode() {
-        //TODO: handle messages with no messageId value
-        if (hashCode == 0) {
-            hashCode = messageId.hashCode();
-        }
-        return hashCode;
-    }
-
-    @Override
-    public int compareTo(JmsMessageId other) {
-        //TODO: handle messages with no messageId value
-        int result = -1;
-        if (other != null) {
-            result = this.toString().compareTo(other.toString());
-        }
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        String result = "JmsMessageId{messageId = ";
-        Object id = messageId;
-        if (id == null) {
-            result = result + "<null>}";
-        } else {
-            result = result + String.valueOf(id) + "}";
-        }
-
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 1f51b46..d827287 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -28,7 +28,6 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderListener;
@@ -69,7 +68,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
     protected final AmqpSession session;
     protected final InboundTransformer inboundTransformer =
         new JMSMappingInboundTransformer(AmqpJMSVendor.INSTANCE);;
-    protected final Map<JmsMessageId, Delivery> delivered = new LinkedHashMap<JmsMessageId, Delivery>();
+    protected final Map<JmsInboundMessageDispatch, Delivery> delivered = new LinkedHashMap<JmsInboundMessageDispatch, Delivery>();
     protected boolean presettle;
 
     private final ByteArrayOutputStream streamBuffer = new ByteArrayOutputStream();
@@ -188,21 +187,20 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
      * @throws JMSException if an error occurs accessing the Message properties.
      */
     public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
-        JmsMessageId messageId = envelope.getMessage().getFacade().getMessageId();
         Delivery delivery = null;
 
-        if (messageId.getProviderHint() instanceof Delivery) {
-            delivery = (Delivery) messageId.getProviderHint();
+        if (envelope.getProviderHint() instanceof Delivery) {
+            delivery = (Delivery) envelope.getProviderHint();
         } else {
-            delivery = delivered.get(messageId);
+            delivery = delivered.get(envelope);
             if (delivery == null) {
-                LOG.warn("Received Ack for unknown message: {}", messageId);
+                LOG.warn("Received Ack for unknown message: {}", envelope);
                 return;
             }
         }
 
         if (ackType.equals(ACK_TYPE.DELIVERED)) {
-            LOG.debug("Delivered Ack of message: {}", messageId);
+            LOG.debug("Delivered Ack of message: {}", envelope);
             if (session.isTransacted()) {
                 Binary txnId = session.getTransactionContext().getAmqpTransactionId();
                 if (txnId != null) {
@@ -214,16 +212,16 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
                 }
             }
             if (!isPresettle()) {
-                delivered.put(messageId, delivery);
+                delivered.put(envelope, delivery);
             }
             sendFlowIfNeeded();
         } else if (ackType.equals(ACK_TYPE.CONSUMED)) {
             // A Consumer may not always send a delivered ACK so we need to check to
             // ensure we don't add to much credit to the link.
-            if (isPresettle() || delivered.remove(messageId) == null) {
+            if (isPresettle() || delivered.remove(envelope) == null) {
                 sendFlowIfNeeded();
             }
-            LOG.debug("Consumed Ack of message: {}", messageId);
+            LOG.debug("Consumed Ack of message: {}", envelope);
             if (!delivery.isSettled()) {
                 delivery.disposition(Accepted.getInstance());
                 delivery.settle();
@@ -237,7 +235,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         } else if (ackType.equals(ACK_TYPE.POISONED)) {
             deliveryFailed(delivery, false);
         } else {
-            LOG.warn("Unsupporeted Ack Type for message: {}", messageId);
+            LOG.warn("Unsupported Ack Type for message: {}", envelope);
         }
     }
 
@@ -332,9 +330,6 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
             return;
         }
 
-        // Store link to delivery in the hint for use in acknowledge requests.
-        message.getFacade().getMessageId().setProviderHint(incoming);
-
         // We need to signal to the create message that it's being dispatched and for now
         // the transformer creates the message in write mode, onSend will reset it to read
         // mode and the consumer will see it as a normal received message.
@@ -343,7 +338,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch();
         envelope.setMessage(message);
         envelope.setConsumerId(info.getConsumerId());
+        // Store link to delivery in the hint for use in acknowledge requests.
         envelope.setProviderHint(incoming);
+        //TODO: the below messageId retrieval may result in type conversion costs
+        envelope.setDispatchId(message.getJMSMessageID());
 
         // Store reference to envelope in delivery context for recovery
         incoming.setContext(envelope);
@@ -408,13 +406,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         ProviderListener listener = session.getProvider().getProviderListener();
         if (listener != null) {
             if (envelope.getMessage() != null) {
-                LOG.debug("Dispatching received message: {}", envelope.getMessage().getFacade().getMessageId());
+                LOG.debug("Dispatching received message: {}", envelope);
             } else {
                 LOG.debug("Dispatching end of browse to: {}", envelope.getConsumerId());
             }
             listener.onMessage(envelope);
         } else {
-            LOG.error("Provider listener is not set, message will be dropped.");
+            LOG.error("Provider listener is not set, message will be dropped: {}", envelope);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index a36e419..dae51f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -109,7 +109,7 @@ public class AmqpFixedProducer extends AmqpProducer {
     private void doSend(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
         JmsMessageFacade facade = envelope.getMessage().getFacade();
 
-        LOG.trace("Producer sending message: {}", envelope.getMessage().getFacade().getMessageId());
+        LOG.trace("Producer sending message: {}", envelope);
 
         byte[] tag = tagGenerator.getNextTag();
         Delivery delivery = null;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index d0ee5af..8afb6db 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -35,7 +35,6 @@ import javax.jms.MessageFormatException;
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.exceptions.IdConversionException;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.Proton;
@@ -288,31 +287,30 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     }
 
     @Override
-    public JmsMessageId getMessageId() {
+    public String getMessageId() {
         Object underlying = message.getMessageId();
         AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE;
         String baseStringId = helper.toBaseMessageIdString(underlying);
 
         // Ensure the ID: prefix is present.
-        // TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix.
+        // TODO: should we always do this when non-null? AMQP JMS Mapping says never to send the "ID:" prefix.
         // TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities?
         //       I Ended up putting it in JmsMessage after the above comment, as a workaround for the
         //       current JmsDefaultMessageFacade usage.
         if (baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) {
             baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId;
         }
-        return new JmsMessageId(baseStringId);
+        return baseStringId;
     }
 
     @Override
-    public void setMessageId(JmsMessageId messageId) {
+    public void setMessageId(String messageId) {
         if (messageId == null) {
             message.setMessageId(null);
         } else {
-            String value = messageId.getValue();
             // Remove the first 'ID:' prefix if present
-            value = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(value);
-            message.setMessageId(value);
+            String stripped = AmqpMessageIdHelper.INSTANCE.stripMessageIdPrefix(messageId);
+            message.setMessageId(stripped);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2bd7b1cc/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index c1200fb..5a14699 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -32,7 +32,6 @@ import java.util.UUID;
 
 import org.apache.qpid.jms.JmsDestination;
 import org.apache.qpid.jms.JmsTopic;
-import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.Proton;
@@ -251,7 +250,7 @@ public class AmqpJmsMessageFacadeTest {
     public void testGetMessageIdIsNullOnNewMessage() {
         AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();
 
-        assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId().getValue());
+        assertNull("Expected messageId value to be null on new message", amqpMessageFacade.getMessageId());
     }
 
     /**
@@ -263,11 +262,10 @@ public class AmqpJmsMessageFacadeTest {
 
         AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();
 
-        JmsMessageId jmsMessageId = new JmsMessageId(testMessageId);
-        amqpMessageFacade.setMessageId(jmsMessageId);
+        amqpMessageFacade.setMessageId(testMessageId);
 
-        assertEquals("Expected messageId object not returned", jmsMessageId, amqpMessageFacade.getMessageId());
-        assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId().getValue());
+        assertEquals("Expected messageId not returned", testMessageId, amqpMessageFacade.getMessageId());
+        assertEquals("ID strings were not equal", testMessageId, amqpMessageFacade.getMessageId());
     }
 
     /**
@@ -326,7 +324,7 @@ public class AmqpJmsMessageFacadeTest {
 
         String expectedString = appendIdAndTypePrefix(testMessageId);
 
-        assertEquals("Incorrect messageId value received", new JmsMessageId(expectedString), amqpMessageFacade.getMessageId());
+        assertEquals("Incorrect messageId value received", expectedString, amqpMessageFacade.getMessageId());
     }
 
     private String appendIdAndTypePrefix(Object testMessageId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] git commit: small refactor for clarity, add note about exception handling

Posted by ro...@apache.org.
small refactor for clarity, add note about exception handling


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1af530ad
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1af530ad
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1af530ad

Branch: refs/heads/master
Commit: 1af530adab2fc8e8d2ca5364aac880ab4bed4ef4
Parents: 148e1b9
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 11:14:00 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsMessageConsumer.java     | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1af530ad/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 07cba2a..129b1e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -249,9 +249,9 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
             if (message.getAcknowledgeCallback() != null || session.isTransacted()) {
                 // Message has been received by the app.. expand the credit
                 // window so that we receive more messages.
-                session.acknowledge(envelope, ACK_TYPE.DELIVERED);
+                doAckDelivered(envelope);
             } else {
-                doAck(envelope);
+                doAckConsumed(envelope);
             }
             // Tags that we have delivered and can't close if in a TX Session.
             delivered.set(true);
@@ -259,7 +259,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
         return envelope;
     }
 
-    private void doAck(final JmsInboundMessageDispatch envelope) throws JMSException {
+    private void doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException {
         checkClosed();
         try {
             session.acknowledge(envelope, ACK_TYPE.CONSUMED);
@@ -269,6 +269,11 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
         }
     }
 
+    private void doAckDelivered(final JmsInboundMessageDispatch envelope) throws JMSException {
+        // TODO: this can also throw, so should we handle it the same as doAckConsumed above?
+        session.acknowledge(envelope, ACK_TYPE.DELIVERED);
+    }
+
     /**
      * Called from the session when a new Message has been dispatched to this Consumer
      * from the connection.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] git commit: make the inbound messsage envelope part of the Id heirachy

Posted by ro...@apache.org.
make the inbound messsage envelope part of the Id heirachy


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2728bf61
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2728bf61
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2728bf61

Branch: refs/heads/master
Commit: 2728bf610d3dc74d23034f6b54813eccb78e81f1
Parents: 11e9f52
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 11:27:15 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100

----------------------------------------------------------------------
 .../qpid/jms/message/JmsInboundMessageDispatch.java     | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2728bf61/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index e651060..fd8b0b1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -16,16 +16,16 @@
  */
 package org.apache.qpid.jms.message;
 
+import org.apache.qpid.jms.meta.JmsAbstractResourceId;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 
 /**
  * Envelope used to deliver incoming messages to their targeted consumer.
  */
-public class JmsInboundMessageDispatch {
+public class JmsInboundMessageDispatch extends JmsAbstractResourceId{
 
     private JmsConsumerId consumerId;
     private JmsMessage message;
-    private Object providerHint;
 
     public JmsMessage getMessage() {
         return message;
@@ -43,14 +43,6 @@ public class JmsInboundMessageDispatch {
         this.consumerId = consumerId;
     }
 
-    public Object getProviderHint() {
-        return this.providerHint;
-    }
-
-    public void setProviderHint(Object hint) {
-        this.providerHint = hint;
-    }
-
     public void onMessageRedelivered() {
         this.message.incrementRedeliveryCount();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] git commit: reduce JmsMessageId to a single constructor

Posted by ro...@apache.org.
reduce JmsMessageId to a single constructor


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/11e9f52c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/11e9f52c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/11e9f52c

Branch: refs/heads/master
Commit: 11e9f52c42dec0fe371bc4b9ea239413049e33d5
Parents: 1af530a
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Sep 25 11:26:16 2014 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Sep 25 15:23:42 2014 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/qpid/jms/JmsSession.java   |  2 +-
 .../java/org/apache/qpid/jms/meta/JmsMessageId.java     | 12 ------------
 2 files changed, 1 insertion(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11e9f52c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index a7ede86..92aa83a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -859,7 +859,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
     }
 
     private JmsMessageId getNextMessageId(JmsMessageProducer producer) {
-        return new JmsMessageId(producer.getProducerId(), producer.getNextMessageSequence());
+        return new JmsMessageId(producer.getProducerId().toString() + "-" + producer.getNextMessageSequence());
     }
 
     private <T extends JmsMessage> T init(T message) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/11e9f52c/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
index 1698231..9a75242 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsMessageId.java
@@ -24,18 +24,6 @@ public class JmsMessageId extends JmsAbstractResourceId implements Comparable<Jm
 
     protected String messageId;
 
-    public JmsMessageId(JmsProducerInfo producerInfo, long producerSequenceId) {
-        this(producerInfo.getProducerId(), producerSequenceId);
-    }
-
-    public JmsMessageId(JmsProducerId producerId, long producerSequenceId) {
-        this(producerId.toString(), producerSequenceId);
-    }
-
-    public JmsMessageId(String producerId, long producerSequenceId) {
-        this(producerId + "-" + producerSequenceId);
-    }
-
     public JmsMessageId(String messageId) {
         this.messageId = messageId;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org