You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/21 22:46:55 UTC

[4/4] qpid-jms git commit: QPIDJMS-121 Add support for controlling the message disposition sent on a call to Message.acknowledge when in a client ack session. The mode is set via a vendor property JMS_AMQP_ACK_TYPE with values for ACCEPTED, REJECTED, RE

QPIDJMS-121 Add support for controlling the message disposition sent on
a call to Message.acknowledge when in a client ack session.  The mode is
set via a vendor property JMS_AMQP_ACK_TYPE with values for ACCEPTED,
REJECTED, RELEASED etc.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ca457d73
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ca457d73
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ca457d73

Branch: refs/heads/master
Commit: ca457d7375233e797fd94fafa18c2e024500236d
Parents: c0bc5f0
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 21 16:27:18 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 21 16:27:18 2015 -0400

----------------------------------------------------------------------
 .../apache/qpid/jms/JmsAcknowledgeCallback.java |   90 ++
 .../java/org/apache/qpid/jms/JmsConnection.java |    8 +-
 .../qpid/jms/JmsLocalTransactionContext.java    |    2 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   17 +-
 .../java/org/apache/qpid/jms/JmsSession.java    |    6 +-
 .../apache/qpid/jms/message/JmsMapMessage.java  |   24 +-
 .../org/apache/qpid/jms/message/JmsMessage.java |  120 +--
 .../message/JmsMessagePropertyIntercepter.java  |  495 ++++++---
 .../qpid/jms/message/JmsMessageSupport.java     |   10 +
 .../jms/message/facade/JmsMessageFacade.java    |    1 +
 .../org/apache/qpid/jms/provider/Provider.java  |   10 +-
 .../qpid/jms/provider/ProviderConstants.java    |   24 +-
 .../qpid/jms/provider/ProviderWrapper.java      |    4 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |   47 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    |    4 +-
 .../qpid/jms/provider/amqp/AmqpSession.java     |    8 +-
 .../qpid/jms/provider/amqp/AmqpSupport.java     |   10 +-
 .../jms/provider/failover/FailoverProvider.java |    4 +-
 .../AmqpAcknowledgementsIntegrationTest.java    |  206 ++++
 .../JmsMessagePropertyIntercepterTest.java      | 1018 +++++++++++++-----
 .../apache/qpid/jms/message/JmsMessageTest.java |   13 +-
 .../failover/FailoverProviderClosedTest.java    |    4 +-
 .../qpid/jms/provider/mock/MockProvider.java    |    2 +-
 23 files changed, 1525 insertions(+), 602 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
new file mode 100644
index 0000000..4923124
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.apache.qpid.jms.message.JmsMessageSupport.ACCEPTED;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsMessageSupport;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+public class JmsAcknowledgeCallback {
+
+    private final JmsSession session;
+    private int ackType;
+
+    public JmsAcknowledgeCallback(JmsSession session) {
+        this.session = session;
+    }
+
+    public void acknowledge() throws JMSException {
+        if (session.isClosed()) {
+            throw new javax.jms.IllegalStateException("Session closed.");
+        }
+
+        session.acknowledge(lookupAckTypeForDisposition(getAckType()));
+    }
+
+    private ACK_TYPE lookupAckTypeForDisposition(int dispositionType) throws JMSException {
+        switch (dispositionType) {
+            case JmsMessageSupport.ACCEPTED:
+                return ACK_TYPE.ACCEPTED;
+            case JmsMessageSupport.REJECTED:
+                return ACK_TYPE.REJECTED;
+            case JmsMessageSupport.RELEASED:
+                return ACK_TYPE.RELEASED;
+            case JmsMessageSupport.MODIFIED_FAILED:
+                return ACK_TYPE.MODIFIED_FAILED;
+            case JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE:
+                return ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE;
+            default:
+                throw new JMSException("Unable to determine ack type for disposition: " + dispositionType);
+        }
+    }
+
+    /**
+     * @return true if the acknowledgement type was updated.
+     */
+    public boolean isAckTypeSet() {
+        return ackType > 0;
+    }
+
+    /**
+     * Clears any previous setting and restores defaults.
+     */
+    public void clearAckType() {
+        ackType = 0;
+    }
+
+    /**
+     * @return the ackType that has been configured or the default if none has been set.
+     */
+    public int getAckType() {
+        return ackType <= 0 ? ACCEPTED : ackType;
+    }
+
+    /**
+     * Sets the acknowledgement type that will be used.
+     *
+     * @param ackType
+     *      the ackType to apply to the session acknowledge.
+     */
+    public void setAckType(int ackType) {
+        this.ackType = ackType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 1686625..3ee98b5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -660,16 +660,16 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
         }
     }
 
-    void acknowledge(JmsSessionId sessionId) throws JMSException {
-        acknowledge(sessionId, null);
+    void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType) throws JMSException {
+        acknowledge(sessionId, ackType, null);
     }
 
-    void acknowledge(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException {
+    void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, ProviderSynchronization synchronization) throws JMSException {
         checkClosedOrFailed();
 
         try {
             ProviderFuture request = new ProviderFuture(synchronization);
-            provider.acknowledge(sessionId, request);
+            provider.acknowledge(sessionId, ackType, request);
             request.sync();
         } catch (Exception ioe) {
             throw JmsExceptionSupport.create(ioe);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 40a4ad1..6a86033 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -88,7 +88,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
     @Override
     public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
         // Consumed or delivered messages fall into a transaction otherwise just pass it in.
-        if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
+        if (ackType == ACK_TYPE.ACCEPTED || ackType == ACK_TYPE.DELIVERED) {
             lock.readLock().lock();
             try {
                 connection.acknowledge(envelope, ackType, new ProviderSynchronization() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 8d75093..06f036c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -17,7 +17,6 @@
 package org.apache.qpid.jms;
 
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -369,7 +368,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
     private JmsInboundMessageDispatch doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException {
         checkClosed();
         try {
-            session.acknowledge(envelope, ACK_TYPE.CONSUMED);
+            session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
         } catch (JMSException ex) {
             session.onException(ex);
             throw ex;
@@ -398,7 +397,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
 
     private void doAckUndeliverable(final JmsInboundMessageDispatch envelope) throws JMSException {
         try {
-            session.acknowledge(envelope, ACK_TYPE.POISONED);
+            session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
         } catch (JMSException ex) {
             session.onException(ex);
             throw ex;
@@ -426,17 +425,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
         lock.lock();
         try {
             if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
-                envelope.getMessage().setAcknowledgeCallback(new Callable<Void>() {
-                    @Override
-                    public Void call() throws Exception {
-                        if (session.isClosed()) {
-                            throw new javax.jms.IllegalStateException("Session closed.");
-                        }
-                        session.acknowledge();
-                        envelope.getMessage().setAcknowledgeCallback(null);
-                        return null;
-                    }
-                });
+                envelope.getMessage().setAcknowledgeCallback(new JmsAcknowledgeCallback(session));
             }
 
             if (envelope.isEnqueueFirst()) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index c0a361b..1350c3c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -693,14 +693,16 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
      * Acknowledge all previously delivered messages in this Session as consumed.  This
      * method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode.
      *
+     * @param ackType
+     *        The type of acknowledgement being done.
      * @throws JMSException if an error occurs while the acknowledge is processed.
      */
-    void acknowledge() throws JMSException {
+    void acknowledge(ACK_TYPE ackType) throws JMSException {
         if (isTransacted()) {
             throw new IllegalStateException("Session acknowledge called inside a transacted Session");
         }
 
-        this.connection.acknowledge(sessionInfo.getId());
+        this.connection.acknowledge(sessionInfo.getId(), ackType);
     }
 
     public boolean isClosed() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
index e5184cd..44327e4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
@@ -275,11 +275,7 @@ public class JmsMapMessage extends JmsMessage implements MapMessage {
 
     @Override
     public void setObject(String name, Object value) throws JMSException {
-        // byte[] not allowed on properties so cover that here.
-        if (!(value instanceof byte[])) {
-            checkValidObject(value);
-        }
-
+        checkValidObject(value);
         put(name, value);
     }
 
@@ -316,4 +312,22 @@ public class JmsMapMessage extends JmsMessage implements MapMessage {
             throw new IllegalArgumentException("Map key name must not be the empty string");
         }
     }
+
+    private void checkValidObject(Object value) throws MessageFormatException {
+        boolean valid = value instanceof Boolean ||
+                        value instanceof Byte ||
+                        value instanceof Short ||
+                        value instanceof Integer ||
+                        value instanceof Long ||
+                        value instanceof Float ||
+                        value instanceof Double ||
+                        value instanceof Character ||
+                        value instanceof String ||
+                        value instanceof byte[] ||
+                        value == null;
+
+        if (!valid) {
+            throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index fb788fe..f2123c7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -20,7 +20,6 @@ import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Callable;
 
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -29,6 +28,7 @@ import javax.jms.MessageFormatException;
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 
+import org.apache.qpid.jms.JmsAcknowledgeCallback;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
@@ -37,7 +37,7 @@ import org.apache.qpid.jms.util.TypeConversionSupport;
 public class JmsMessage implements javax.jms.Message {
 
     private static final String ID_PREFIX = "ID:";
-    protected transient Callable<Void> acknowledgeCallback;
+    protected transient JmsAcknowledgeCallback acknowledgeCallback;
     protected transient JmsConnection connection;
 
     protected final JmsMessageFacade facade;
@@ -94,7 +94,8 @@ public class JmsMessage implements javax.jms.Message {
     public void acknowledge() throws JMSException {
         if (acknowledgeCallback != null) {
             try {
-                acknowledgeCallback.call();
+                acknowledgeCallback.acknowledge();
+                acknowledgeCallback = null;
             } catch (Throwable e) {
                 throw JmsExceptionSupport.create(e);
             }
@@ -248,38 +249,17 @@ public class JmsMessage implements javax.jms.Message {
 
     @Override
     public void clearProperties() throws JMSException {
-        JmsMessagePropertyIntercepter.clearProperties(facade, true);
-        setReadOnlyProperties(false);
+        JmsMessagePropertyIntercepter.clearProperties(this, true);
     }
 
     @Override
     public boolean propertyExists(String name) throws JMSException {
-        try {
-            checkPropertyNameIsValid(name);
-        } catch (IllegalArgumentException iae) {
-            return false;
-        }
-
-        return JmsMessagePropertyIntercepter.propertyExists(facade, name);
+        return JmsMessagePropertyIntercepter.propertyExists(this, name);
     }
 
     @Override
     public Enumeration<?> getPropertyNames() throws JMSException {
-        Set<String> result = new HashSet<String>();
-
-        Set<String> propertyNames = JmsMessagePropertyIntercepter.getPropertyNames(facade, true);
-        for (String name : propertyNames) {
-            try {
-                checkPropertyNameIsValid(name);
-            } catch (IllegalArgumentException iae) {
-                // Don't add the name
-                continue;
-            }
-
-            result.add(name);
-        }
-
-        return Collections.enumeration(result);
+        return Collections.enumeration(JmsMessagePropertyIntercepter.getPropertyNames(this, true));
     }
 
     /**
@@ -292,39 +272,18 @@ public class JmsMessage implements javax.jms.Message {
      */
     public Enumeration<?> getAllPropertyNames() throws JMSException {
         Set<String> result = new HashSet<String>();
-        result.addAll(JmsMessagePropertyIntercepter.getAllPropertyNames(facade));
+        result.addAll(JmsMessagePropertyIntercepter.getAllPropertyNames(this));
         return Collections.enumeration(result);
     }
 
     @Override
     public void setObjectProperty(String name, Object value) throws JMSException {
-        checkReadOnlyProperties();
-        checkPropertyNameIsValid(name);
-        checkValidObject(value);
-        JmsMessagePropertyIntercepter.setProperty(facade, name, value);
-    }
-
-    protected void checkValidObject(Object value) throws MessageFormatException {
-        boolean valid = value instanceof Boolean ||
-                        value instanceof Byte ||
-                        value instanceof Short ||
-                        value instanceof Integer ||
-                        value instanceof Long ||
-                        value instanceof Float ||
-                        value instanceof Double ||
-                        value instanceof Character ||
-                        value instanceof String ||
-                        value == null;
-
-        if (!valid) {
-            throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
-        }
+        JmsMessagePropertyIntercepter.setProperty(this, name, value);
     }
 
     @Override
     public Object getObjectProperty(String name) throws JMSException {
-        checkPropertyNameIsValid(name);
-        return JmsMessagePropertyIntercepter.getProperty(facade, name);
+        return JmsMessagePropertyIntercepter.getProperty(this, name);
     }
 
     @Override
@@ -471,12 +430,12 @@ public class JmsMessage implements javax.jms.Message {
         setObjectProperty(name, value);
     }
 
-    public Callable<Void> getAcknowledgeCallback() {
+    public JmsAcknowledgeCallback getAcknowledgeCallback() {
         return acknowledgeCallback;
     }
 
-    public void setAcknowledgeCallback(Callable<Void> acknowledgeCallback) {
-        this.acknowledgeCallback = acknowledgeCallback;
+    public void setAcknowledgeCallback(JmsAcknowledgeCallback jmsAcknowledgeCallback) {
+        this.acknowledgeCallback = jmsAcknowledgeCallback;
     }
 
     /**
@@ -530,6 +489,8 @@ public class JmsMessage implements javax.jms.Message {
         return "JmsMessage { " + facade + " }";
     }
 
+    //----- State validation methods -----------------------------------------//
+
     protected void checkReadOnlyProperties() throws MessageNotWriteableException {
         if (readOnlyProperties) {
             throw new MessageNotWriteableException("Message properties are read-only");
@@ -547,55 +508,4 @@ public class JmsMessage implements javax.jms.Message {
             throw new MessageNotReadableException("Message body is write-only");
         }
     }
-
-    private void checkPropertyNameIsValid(String propertyName) throws IllegalArgumentException {
-        if (propertyName == null) {
-            throw new IllegalArgumentException("Property name must not be null");
-        } else if (propertyName.length() == 0) {
-            throw new IllegalArgumentException("Property name must not be the empty string");
-        }
-
-        if (isValidatePropertyNames()) {
-            checkIdentifierLetterAndDigitRequirements(propertyName);
-            checkIdentifierIsntNullTrueFalse(propertyName);
-            checkIdentifierIsntLogicOperator(propertyName);
-        }
-    }
-
-    private void checkIdentifierIsntLogicOperator(String identifier) {
-        // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
-        if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
-            "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
-            "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
-
-            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
-        }
-    }
-
-    private void checkIdentifierIsntNullTrueFalse(String identifier) {
-        // Identifiers cannot be the names NULL, TRUE, and FALSE.
-        if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
-            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
-        }
-    }
-
-    private void checkIdentifierLetterAndDigitRequirements(String identifier) {
-        // An identifier is an unlimited-length sequence of letters and digits, the first of
-        // which must be a letter.  A letter is any character for which the method
-        // Character.isJavaLetter returns true.  This includes '_' and '$'.  A letter or digit
-        // is any character for which the method Character.isJavaLetterOrDigit returns true.
-        char startChar = identifier.charAt(0);
-        if (!(Character.isJavaIdentifierStart(startChar))) {
-            throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
-        }
-
-        // JMS part character
-        int length = identifier.length();
-        for (int i = 1; i < length; i++) {
-            char ch = identifier.charAt(i);
-            if (!(Character.isJavaIdentifierPart(ch))) {
-                throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index 4dd3c13..a502760 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -20,6 +20,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_DELIVERY_COUNT;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPID;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID;
+import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION;
@@ -41,9 +42,9 @@ import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageFormatException;
 
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
-import org.apache.qpid.jms.message.facade.JmsMessageFacade;
 import org.apache.qpid.jms.util.TypeConversionSupport;
 
 /**
@@ -55,6 +56,7 @@ public class JmsMessagePropertyIntercepter {
     private static final Map<String, PropertyIntercepter> PROPERTY_INTERCEPTERS =
         new HashMap<String, PropertyIntercepter>();
     private static final Set<String> STANDARD_HEADERS = new HashSet<String>();
+    private static final Set<String> VENDOR_PROPERTIES = new HashSet<String>();
 
     /**
      * Interface for a Property intercepter object used to write JMS style
@@ -75,7 +77,7 @@ public class JmsMessagePropertyIntercepter {
          *
          * @throws JMSException if an error occurs while accessing the property
          */
-        Object getProperty(JmsMessageFacade message) throws JMSException;
+        Object getProperty(JmsMessage message) throws JMSException;
 
         /**
          * Called when the names property is assigned from an JMS Message object.
@@ -87,7 +89,7 @@ public class JmsMessagePropertyIntercepter {
          *
          * @throws JMSException if an error occurs writing the property.
          */
-        void setProperty(JmsMessageFacade message, Object value) throws JMSException;
+        void setProperty(JmsMessage message, Object value) throws JMSException;
 
         /**
          * Indicates if the intercepted property has a value currently assigned.
@@ -97,7 +99,7 @@ public class JmsMessagePropertyIntercepter {
          *
          * @return true if the intercepted property has a value assigned to it.
          */
-        boolean propertyExists(JmsMessageFacade message);
+        boolean propertyExists(JmsMessage message);
 
         /**
          * Request that the intercepted property be cleared.  For properties that
@@ -109,7 +111,16 @@ public class JmsMessagePropertyIntercepter {
          *
          * @throws JMSException if an error occurs clearing the property.
          */
-        void clearProperty(JmsMessageFacade message) throws JMSException;
+        void clearProperty(JmsMessage message) throws JMSException;
+
+        /**
+         * Return true if the intercepter can bypass the read-only state of a Message
+         * and its properties.
+         *
+         * @return true if the intercepter is immune to read-only state checks.
+         */
+        boolean isAlwaysWritable();
+
     }
 
     static {
@@ -124,15 +135,17 @@ public class JmsMessagePropertyIntercepter {
         STANDARD_HEADERS.add(JMS_EXPIRATION);
         STANDARD_HEADERS.add(JMS_PRIORITY);
 
+        VENDOR_PROPERTIES.add(JMS_AMQP_ACK_TYPE);
+
         PROPERTY_INTERCEPTERS.put(JMS_DESTINATION, new PropertyIntercepter() {
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 throw new JMSException("Cannot set JMS Destination as a property, use setJMSDestination() instead");
             }
 
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                Destination dest = message.getDestination();
+            public Object getProperty(JmsMessage message) throws JMSException {
+                Destination dest = message.getFacade().getDestination();
                 if (dest == null) {
                     return null;
                 }
@@ -140,72 +153,87 @@ public class JmsMessagePropertyIntercepter {
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getDestination() != null;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getDestination() != null;
+            }
+
+            @Override
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setDestination(null);
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setDestination(null);
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_REPLYTO, new PropertyIntercepter() {
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 throw new JMSException("Cannot set JMS ReplyTo as a property, use setJMSReplTo() instead");
             }
 
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                if (message.getReplyTo() == null) {
+            public Object getProperty(JmsMessage message) throws JMSException {
+                if (message.getFacade().getReplyTo() == null) {
                     return null;
                 }
-                return message.getReplyTo().toString();
+                return message.getFacade().getReplyTo().toString();
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getReplyTo() != null;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getReplyTo() != null;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setReplyTo(null);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setReplyTo(null);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_TYPE, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return message.getType();
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return message.getFacade().getType();
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 String rc = (String) TypeConversionSupport.convert(value, String.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setType(rc);
+                message.getFacade().setType(rc);
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getType() != null;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getType() != null;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setType(null);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setType(null);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_DELIVERY_MODE, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return message.isPersistent() ? "PERSISTENT" : "NON_PERSISTENT";
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return message.getFacade().isPersistent() ? "PERSISTENT" : "NON_PERSISTENT";
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Integer rc = null;
                 try {
                     rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
@@ -227,258 +255,308 @@ public class JmsMessagePropertyIntercepter {
                     if (bool == null) {
                         throw new JMSException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
                     } else {
-                        message.setPersistent(bool.booleanValue());
+                        message.getFacade().setPersistent(bool.booleanValue());
                     }
                 } else {
-                    message.setPersistent(rc == DeliveryMode.PERSISTENT);
+                    message.getFacade().setPersistent(rc == DeliveryMode.PERSISTENT);
                 }
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
+            public boolean propertyExists(JmsMessage message) {
                 return true;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setPersistent(true); // Default value
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setPersistent(true); // Default value
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_PRIORITY, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return Integer.valueOf(message.getPriority());
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return Integer.valueOf(message.getFacade().getPriority());
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setPriority(rc.byteValue());
+                message.getFacade().setPriority(rc.byteValue());
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
+            public boolean propertyExists(JmsMessage message) {
                 return true;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setPriority(Message.DEFAULT_PRIORITY);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setPriority(Message.DEFAULT_PRIORITY);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_MESSAGEID, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                if (message.getMessageId() == null) {
+            public Object getProperty(JmsMessage message) throws JMSException {
+                if (message.getFacade().getMessageId() == null) {
                     return null;
                 }
-                return message.getMessageId();
+                return message.getFacade().getMessageId();
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 String rc = (String) TypeConversionSupport.convert(value, String.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSMessageID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setMessageId(rc);
+                message.getFacade().setMessageId(rc);
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getMessageId() != null;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getMessageId() != null;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setMessageId(null);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setMessageId(null);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_TIMESTAMP, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return Long.valueOf(message.getTimestamp());
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return Long.valueOf(message.getFacade().getTimestamp());
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setTimestamp(rc.longValue());
+                message.getFacade().setTimestamp(rc.longValue());
+            }
+
+            @Override
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getTimestamp() > 0;
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getTimestamp() > 0;
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setTimestamp(0);
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setTimestamp(0);
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_CORRELATIONID, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return message.getCorrelationId();
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return message.getFacade().getCorrelationId();
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 String rc = (String) TypeConversionSupport.convert(value, String.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setCorrelationId(rc);
+                message.getFacade().setCorrelationId(rc);
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getCorrelationId() != null;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getCorrelationId() != null;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) throws JMSException {
-                message.setCorrelationId(null);
+            public void clearProperty(JmsMessage message) throws JMSException {
+                message.getFacade().setCorrelationId(null);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_EXPIRATION, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return Long.valueOf(message.getExpiration());
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return Long.valueOf(message.getFacade().getExpiration());
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setExpiration(rc.longValue());
+                message.getFacade().setExpiration(rc.longValue());
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getExpiration() > 0;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getExpiration() > 0;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setExpiration(0);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setExpiration(0);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMS_REDELIVERED, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return Boolean.valueOf(message.isRedelivered());
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return Boolean.valueOf(message.getFacade().isRedelivered());
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setRedelivered(rc.booleanValue());
+                message.getFacade().setRedelivered(rc.booleanValue());
+            }
+
+            @Override
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().isRedelivered();
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.isRedelivered();
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setRedelivered(false);
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setRedelivered(false);
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMSX_DELIVERY_COUNT, new PropertyIntercepter() {
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setDeliveryCount(rc.intValue());
+                message.getFacade().setDeliveryCount(rc.intValue());
             }
 
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return Integer.valueOf(message.getDeliveryCount());
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return Integer.valueOf(message.getFacade().getDeliveryCount());
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
+            public boolean propertyExists(JmsMessage message) {
                 return true;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setDeliveryCount(1);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setDeliveryCount(1);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMSX_GROUPID, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return message.getGroupId();
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return message.getFacade().getGroupId();
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 String rc = (String) TypeConversionSupport.convert(value, String.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setGroupId(rc);
+                message.getFacade().setGroupId(rc);
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getGroupId() != null;
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getGroupId() != null;
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setGroupId(null);
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setGroupId(null);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMSX_GROUPSEQ, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                return message.getGroupSequence();
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return message.getFacade().getGroupSequence();
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
                 if (rc == null) {
                     throw new JMSException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setGroupSequence(rc.intValue());
+                message.getFacade().setGroupSequence(rc.intValue());
+            }
+
+            @Override
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getGroupSequence() != 0;
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getGroupSequence() != 0;
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setGroupSequence(0);
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setGroupSequence(0);
+            public boolean isAlwaysWritable() {
+                return false;
             }
         });
         PROPERTY_INTERCEPTERS.put(JMSX_USERID, new PropertyIntercepter() {
             @Override
-            public Object getProperty(JmsMessageFacade message) throws JMSException {
-                Object userId = message.getUserId();
+            public Object getProperty(JmsMessage message) throws JMSException {
+                Object userId = message.getFacade().getUserId();
                 if (userId == null) {
                     try {
-                        userId = message.getProperty("JMSXUserID");
+                        userId = message.getFacade().getProperty("JMSXUserID");
                     } catch (Exception e) {
                         throw JmsExceptionSupport.create(e);
                     }
@@ -488,21 +566,75 @@ public class JmsMessagePropertyIntercepter {
             }
 
             @Override
-            public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
                 if (!(value instanceof String)) {
                     throw new JMSException("Property JMSXUserID cannot be set from a " + value.getClass().getName() + ".");
                 }
-                message.setUserId((String) value);
+                message.getFacade().setUserId((String) value);
+            }
+
+            @Override
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getUserId() != null;
             }
 
             @Override
-            public boolean propertyExists(JmsMessageFacade message) {
-                return message.getUserId() != null;
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setUserId(null);
             }
 
             @Override
-            public void clearProperty(JmsMessageFacade message) {
-                message.setUserId(null);
+            public boolean isAlwaysWritable() {
+                return false;
+            }
+        });
+        PROPERTY_INTERCEPTERS.put(JMS_AMQP_ACK_TYPE, new PropertyIntercepter() {
+            @Override
+            public Object getProperty(JmsMessage message) throws JMSException {
+                Object ackType = null;
+
+                if (message.getAcknowledgeCallback() != null &&
+                    message.getAcknowledgeCallback().isAckTypeSet()) {
+
+                    ackType = message.getAcknowledgeCallback().getAckType();
+                }
+
+                return ackType;
+            }
+
+            @Override
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
+                if (message.getAcknowledgeCallback() == null) {
+                    throw new JMSException("Session Acknoewledgement Mode does not allow setting: " + JMS_AMQP_ACK_TYPE);
+                }
+
+                Integer ackType = (Integer) TypeConversionSupport.convert(value, Integer.class);
+                if (ackType == null) {
+                    throw new JMSException("Property " + JMS_AMQP_ACK_TYPE + " cannot be set from a " + value.getClass().getName() + ".");
+                }
+
+                message.getAcknowledgeCallback().setAckType(ackType);
+            }
+
+            @Override
+            public boolean propertyExists(JmsMessage message) {
+                if (message.getAcknowledgeCallback() != null) {
+                    return message.getAcknowledgeCallback().isAckTypeSet();
+                }
+
+                return false;
+            }
+
+            @Override
+            public void clearProperty(JmsMessage message) throws JMSException {
+                if (message.getAcknowledgeCallback() != null) {
+                    message.getAcknowledgeCallback().clearAckType();
+                }
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return true;
             }
         });
     }
@@ -513,7 +645,7 @@ public class JmsMessagePropertyIntercepter {
      * method.
      *
      * @param message
-     *        the JmsMessageFacade instance to read from
+     *        the JmsMessage instance to read from
      * @param name
      *        the property name that is being requested.
      *
@@ -521,14 +653,16 @@ public class JmsMessagePropertyIntercepter {
      *
      * @throws JMSException if an error occurs while reading the defined property.
      */
-    public static Object getProperty(JmsMessageFacade message, String name) throws JMSException {
+    public static Object getProperty(JmsMessage message, String name) throws JMSException {
         Object value = null;
 
+        checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+
         PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name);
         if (jmsPropertyExpression != null) {
             value = jmsPropertyExpression.getProperty(message);
         } else {
-            value = message.getProperty(name);
+            value = message.getFacade().getProperty(name);
         }
 
         return value;
@@ -540,7 +674,7 @@ public class JmsMessagePropertyIntercepter {
      * method.
      *
      * @param message
-     *        the JmsMessageFacade instance to write to.
+     *        the JmsMessage instance to write to.
      * @param name
      *        the property name that is being written.
      * @param value
@@ -548,12 +682,19 @@ public class JmsMessagePropertyIntercepter {
      *
      * @throws JMSException if an error occurs while writing the defined property.
      */
-    public static void setProperty(JmsMessageFacade message, String name, Object value) throws JMSException {
+    public static void setProperty(JmsMessage message, String name, Object value) throws JMSException {
         PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name);
+
+        if (jmsPropertyExpression == null || !jmsPropertyExpression.isAlwaysWritable()) {
+            message.checkReadOnlyProperties();
+        }
+        checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+        checkValidObject(value);
+
         if (jmsPropertyExpression != null) {
             jmsPropertyExpression.setProperty(message, value);
         } else {
-            message.setProperty(name, value);
+            message.getFacade().setProperty(name, value);
         }
     }
 
@@ -561,7 +702,7 @@ public class JmsMessagePropertyIntercepter {
      * Static inspection method to determine if a named property exists for a given message.
      *
      * @param message
-     *        the JmsMessageFacade instance to read from
+     *        the JmsMessage instance to read from
      * @param name
      *        the property name that is being inspected.
      *
@@ -569,12 +710,18 @@ public class JmsMessagePropertyIntercepter {
      *
      * @throws JMSException if an error occurs while validating the defined property.
      */
-    public static boolean propertyExists(JmsMessageFacade message, String name) throws JMSException {
+    public static boolean propertyExists(JmsMessage message, String name) throws JMSException {
+        try {
+            checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+        } catch (IllegalArgumentException iae) {
+            return false;
+        }
+
         PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name);
         if (jmsPropertyExpression != null) {
             return jmsPropertyExpression.propertyExists(message);
         } else {
-            return message.propertyExists(name);
+            return message.getFacade().propertyExists(name);
         }
     }
 
@@ -584,13 +731,13 @@ public class JmsMessagePropertyIntercepter {
      * message facade to clear any message properties that might have been set.
      *
      * @param message
-     *        the JmsMessageFacade instance to read from
+     *        the JmsMessage instance to read from
      * @param excludeStandardJMSHeaders
      *        whether the standard JMS header names should be excluded from the returned set
      *
      * @throws JMSException if an error occurs while validating the defined property.
      */
-    public static void clearProperties(JmsMessageFacade message, boolean excludeStandardJMSHeaders) throws JMSException {
+    public static void clearProperties(JmsMessage message, boolean excludeStandardJMSHeaders) throws JMSException {
         for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) {
             if (excludeStandardJMSHeaders && STANDARD_HEADERS.contains(entry.getKey())) {
                 continue;
@@ -599,7 +746,8 @@ public class JmsMessagePropertyIntercepter {
             entry.getValue().clearProperty(message);
         }
 
-        message.clearProperties();
+        message.getFacade().clearProperties();
+        message.setReadOnlyProperties(false);
     }
 
     /**
@@ -607,15 +755,15 @@ public class JmsMessagePropertyIntercepter {
      * string key value is inserted into an Set and returned.
      *
      * @param message
-     *        the JmsMessageFacade instance to read property names from.
+     *        the JmsMessage instance to read property names from.
      *
      * @return a {@code Set<String>} containing the names of all intercepted properties.
      *
      * @throws JMSException if an error occurs while gathering the message property names.
      */
-    public static Set<String> getAllPropertyNames(JmsMessageFacade message) throws JMSException {
+    public static Set<String> getAllPropertyNames(JmsMessage message) throws JMSException {
         Set<String> names = new HashSet<String>(PROPERTY_INTERCEPTERS.keySet());
-        names.addAll(message.getPropertyNames());
+        names.addAll(message.getFacade().getPropertyNames());
         return names;
     }
 
@@ -627,7 +775,7 @@ public class JmsMessagePropertyIntercepter {
      * will be returned if there are no matching properties.
      *
      * @param message
-     *        the JmsMessageFacade instance to read from
+     *        the JmsMessage instance to read from
      * @param excludeStandardJMSHeaders
      *        whether the standard JMS header names should be excluded from the returned set
      *
@@ -635,7 +783,7 @@ public class JmsMessagePropertyIntercepter {
      *
      * @throws JMSException if an error occurs while gathering the message property names.
      */
-    public static Set<String> getPropertyNames(JmsMessageFacade message, boolean excludeStandardJMSHeaders) throws JMSException {
+    public static Set<String> getPropertyNames(JmsMessage message, boolean excludeStandardJMSHeaders) throws JMSException {
         Set<String> names = new HashSet<String>();
         for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) {
             if (excludeStandardJMSHeaders && STANDARD_HEADERS.contains(entry.getKey())) {
@@ -647,8 +795,87 @@ public class JmsMessagePropertyIntercepter {
             }
         }
 
-        names.addAll(message.getPropertyNames());
+        for (String name : message.getFacade().getPropertyNames()) {
+            try {
+                checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+            } catch (IllegalArgumentException iae) {
+                // Don't add the name
+                continue;
+            }
+
+            names.add(name);
+        }
 
         return names;
     }
+
+    //----- Property Validation Methods --------------------------------------//
+
+    private static void checkPropertyNameIsValid(String propertyName, boolean validateNames) throws IllegalArgumentException {
+        if (propertyName == null) {
+            throw new IllegalArgumentException("Property name must not be null");
+        } else if (propertyName.length() == 0) {
+            throw new IllegalArgumentException("Property name must not be the empty string");
+        }
+
+        if (validateNames) {
+            checkIdentifierLetterAndDigitRequirements(propertyName);
+            checkIdentifierIsntNullTrueFalse(propertyName);
+            checkIdentifierIsntLogicOperator(propertyName);
+        }
+    }
+
+    private static void checkIdentifierIsntLogicOperator(String identifier) {
+        // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
+        if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
+            "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
+            "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
+
+            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+        }
+    }
+
+    private static void checkIdentifierIsntNullTrueFalse(String identifier) {
+        // Identifiers cannot be the names NULL, TRUE, and FALSE.
+        if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
+            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+        }
+    }
+
+    private static void checkIdentifierLetterAndDigitRequirements(String identifier) {
+        // An identifier is an unlimited-length sequence of letters and digits, the first of
+        // which must be a letter.  A letter is any character for which the method
+        // Character.isJavaLetter returns true.  This includes '_' and '$'.  A letter or digit
+        // is any character for which the method Character.isJavaLetterOrDigit returns true.
+        char startChar = identifier.charAt(0);
+        if (!(Character.isJavaIdentifierStart(startChar))) {
+            throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
+        }
+
+        // JMS part character
+        int length = identifier.length();
+        for (int i = 1; i < length; i++) {
+            char ch = identifier.charAt(i);
+            if (!(Character.isJavaIdentifierPart(ch))) {
+                throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
+            }
+        }
+    }
+
+    private static void checkValidObject(Object value) throws MessageFormatException {
+        boolean valid = value instanceof Boolean ||
+                        value instanceof Byte ||
+                        value instanceof Short ||
+                        value instanceof Integer ||
+                        value instanceof Long ||
+                        value instanceof Float ||
+                        value instanceof Double ||
+                        value instanceof Character ||
+                        value instanceof String ||
+                        value == null;
+
+        if (!valid) {
+            throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
index 29a70b4..31f2120 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
@@ -37,4 +37,14 @@ public class JmsMessageSupport {
     public static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
     public static final String JMSX_USERID = "JMSXUserID";
 
+    public static final String JMS_AMQP_ACK_TYPE = "JMS_AMQP_ACK_TYPE";
+
+    // TODO: advise not using these constants, since doing so wont be portable?
+    // Make them package private so they can't be used to begin with?
+    public static final int ACCEPTED = 1;
+    public static final int REJECTED = 2;
+    public static final int RELEASED = 3;
+    public static final int MODIFIED_FAILED = 4;
+    public static final int MODIFIED_FAILED_UNDELIVERABLE = 5;
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index 15bde3c..9a972c1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -406,4 +406,5 @@ public interface JmsMessageFacade {
      *        The message ID to set on this message, or null to clear.
      */
     void setProviderMessageIdObject(Object messageId);
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index ad5993f..d430abf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -187,29 +187,31 @@ public interface Provider {
      * Called to acknowledge all messages that have been delivered in a given session.
      *
      * This method is typically used by a Session that is configured for client acknowledge
-     * mode.  The acknowledgment should only be applied to Messages that have been marked
+     * mode.  The acknowledgement should only be applied to Messages that have been marked
      * as delivered and not those still awaiting dispatch.
      *
      * @param sessionId
      *        the ID of the Session whose delivered messages should be acknowledged.
+     * @param ackType
+     *        The type of acknowledgement being done.
      * @param request
      *        The request object that should be signaled when this operation completes.
      *
      * @throws IOException if an error occurs or the Provider is already closed.
      * @throws JMSException if an error occurs due to JMS violation such as unmatched ack.
      */
-    void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+    void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException;
 
     /**
      * Called to acknowledge a JmsMessage has been delivered, consumed, re-delivered...etc.
      *
-     * The provider should perform an acknowledgment for the message based on the configured
+     * The provider should perform an acknowledgement for the message based on the configured
      * mode of the consumer that it was dispatched to and the capabilities of the protocol.
      *
      * @param envelope
      *        The message dispatch envelope containing the Message delivery information.
      * @param ackType
-     *        The type of acknowledgment being done.
+     *        The type of acknowledgement being done.
      * @param request
      *        The request object that should be signaled when this operation completes.
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
index 9071ba8..1742a13 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -24,20 +24,14 @@ public final class ProviderConstants {
     private ProviderConstants() {}
 
     public enum ACK_TYPE {
-        DELIVERED(0),
-        CONSUMED(1),
-        POISONED(2),
-        EXPIRED(3),
-        RELEASED(4);
-
-        private final int value;
-
-        private ACK_TYPE(int value) {
-            this.value = value;
-        }
-
-        public int getValue() {
-            return value;
-        }
+        // Aligned with AMQP dispositions
+        ACCEPTED,
+        RELEASED,
+        REJECTED,
+        MODIFIED_FAILED,
+        MODIFIED_FAILED_UNDELIVERABLE,
+        // Conceptual
+        DELIVERED,
+        EXPIRED;
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index c053c22..bc4073c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -98,8 +98,8 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
     }
 
     @Override
-    public void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
-        next.acknowledge(sessionId, request);
+    public void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
+        next.acknowledge(sessionId, ackType, request);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 877cd1b..1b61313 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -17,7 +17,8 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
-import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_UNDELIVERABLE;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
@@ -167,13 +168,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
      * Only messages that have already been acknowledged as delivered by the JMS
      * framework will be in the delivered Map.  This means that the link credit
      * would already have been given for these so we just need to settle them.
+     *
+     * @param ackType the type of acknowledgement to perform
      */
-    public void acknowledge() {
-        LOG.trace("Session Acknowledge for consumer: {}", getResourceInfo().getId());
+    public void acknowledge(ACK_TYPE ackType) {
+        LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ackType);
         for (Delivery delivery : delivered.values()) {
-            delivery.disposition(Accepted.getInstance());
+            switch (ackType) {
+                case ACCEPTED:
+                    delivery.disposition(Accepted.getInstance());
+                    break;
+                case RELEASED:
+                    delivery.disposition(Released.getInstance());
+                    break;
+                case REJECTED:
+                    delivery.disposition(REJECTED);
+                    break;
+                case MODIFIED_FAILED:
+                    delivery.disposition(MODIFIED_FAILED);
+                    break;
+                case MODIFIED_FAILED_UNDELIVERABLE:
+                    delivery.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType);
+            }
+
             delivery.settle();
         }
+
         delivered.clear();
     }
 
@@ -209,13 +232,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             }
             setDefaultDeliveryState(delivery, MODIFIED_FAILED);
             sendFlowIfNeeded();
-        } else if (ackType.equals(ACK_TYPE.CONSUMED)) {
+        } else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
             // A Consumer may not always send a DELIVERED ack so we need to
             // check to ensure we don't add too much credit to the link.
             if (isPresettle() || delivered.remove(envelope) == null) {
                 sendFlowIfNeeded();
             }
-            LOG.debug("Consumed Ack of message: {}", envelope);
+            LOG.debug("Accepted Ack of message: {}", envelope);
             if (!delivery.isSettled()) {
                 if (session.isTransacted() && !getResourceInfo().isBrowser()) {
 
@@ -238,10 +261,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                     delivery.settle();
                 }
             }
-        } else if (ackType.equals(ACK_TYPE.POISONED)) {
-            deliveryFailed(delivery);
+        } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
+            deliveryFailedUndeliverable(delivery);
         } else if (ackType.equals(ACK_TYPE.EXPIRED)) {
-            deliveryFailed(delivery);
+            deliveryFailedUndeliverable(delivery);
         } else if (ackType.equals(ACK_TYPE.RELEASED)) {
             delivery.disposition(Released.getInstance());
             delivery.settle();
@@ -399,7 +422,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             //        In the future once the JMS mapping is complete we should be
             //        able to convert everything to some message even if its just
             //        a bytes messages as a fall back.
-            deliveryFailed(incoming);
+            deliveryFailedUndeliverable(incoming);
             return false;
         }
 
@@ -482,8 +505,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         return "AmqpConsumer { " + getResourceInfo().getId() + " }";
     }
 
-    protected void deliveryFailed(Delivery incoming) {
-        incoming.disposition(MODIFIED_UNDELIVERABLE);
+    protected void deliveryFailedUndeliverable(Delivery incoming) {
+        incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE);
         incoming.settle();
         // TODO: this flows credit, which we might not want, e.g if
         // a drain was issued to stop the link.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 5455de3..8e453f9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -494,7 +494,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     }
 
     @Override
-    public void acknowledge(final JmsSessionId sessionId, final AsyncResult request) throws IOException {
+    public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, final AsyncResult request) throws IOException {
         checkClosed();
         serializer.execute(new Runnable() {
 
@@ -503,7 +503,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                 try {
                     checkClosed();
                     AmqpSession amqpSession = connection.getSession(sessionId);
-                    amqpSession.acknowledge();
+                    amqpSession.acknowledge(ackType);
                     pumpToProtonTransport(request);
                     request.onSuccess();
                 } catch (Exception error) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 8961258..847d3f5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -31,6 +31,7 @@ import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpConsumerBuilder;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder;
 import org.apache.qpid.proton.engine.Session;
@@ -61,10 +62,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
     /**
      * Perform an acknowledge of all delivered messages for all consumers active in this
      * Session.
+     *
+     * @param ackType
+     *      controls the acknowledgement that is applied to each message.
      */
-    public void acknowledge() {
+    public void acknowledge(final ACK_TYPE ackType) {
         for (AmqpConsumer consumer : consumers.values()) {
-            consumer.acknowledge();
+            consumer.acknowledge(ackType);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 12604b4..8522fa2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -29,6 +29,7 @@ import javax.jms.TransactionRolledBackException;
 import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ConnectionError;
@@ -62,8 +63,11 @@ public class AmqpSupport {
     public static final Symbol COPY = Symbol.getSymbol("copy");
     public static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
     public static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+
+    // Delivery states
+    public static final Rejected REJECTED = new Rejected();
     public static final Modified MODIFIED_FAILED = new Modified();
-    public static final Modified MODIFIED_UNDELIVERABLE = new Modified();
+    public static final Modified MODIFIED_FAILED_UNDELIVERABLE = new Modified();
 
     // Temporary Destination constants
     public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
@@ -75,8 +79,8 @@ public class AmqpSupport {
     static {
         MODIFIED_FAILED.setDeliveryFailed(true);
 
-        MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
-        MODIFIED_UNDELIVERABLE.setUndeliverableHere(true);
+        MODIFIED_FAILED_UNDELIVERABLE.setDeliveryFailed(true);
+        MODIFIED_FAILED_UNDELIVERABLE.setUndeliverableHere(true);
     }
 
     //----- Utility Methods --------------------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index f1676df..ebbc607 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -342,12 +342,12 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     }
 
     @Override
-    public void acknowledge(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
+    public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
         checkClosed();
         final FailoverRequest pending = new FailoverRequest(request) {
             @Override
             public void doTask() throws Exception {
-                provider.acknowledge(sessionId, this);
+                provider.acknowledge(sessionId, ackType, this);
             }
 
             @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org