You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/21 22:46:55 UTC
[4/4] qpid-jms git commit: QPIDJMS-121 Add support for controlling
the message disposition sent on a call to Message.acknowledge when in a
client ack session. The mode is set via a vendor property JMS_AMQP_ACK_TYPE
with values for ACCEPTED, REJECTED, RE
QPIDJMS-121 Add support for controlling the message disposition sent on
a call to Message.acknowledge when in a client ack session. The mode is
set via a vendor property JMS_AMQP_ACK_TYPE with values for ACCEPTED,
REJECTED, RELEASED etc.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ca457d73
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ca457d73
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ca457d73
Branch: refs/heads/master
Commit: ca457d7375233e797fd94fafa18c2e024500236d
Parents: c0bc5f0
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Oct 21 16:27:18 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Oct 21 16:27:18 2015 -0400
----------------------------------------------------------------------
.../apache/qpid/jms/JmsAcknowledgeCallback.java | 90 ++
.../java/org/apache/qpid/jms/JmsConnection.java | 8 +-
.../qpid/jms/JmsLocalTransactionContext.java | 2 +-
.../org/apache/qpid/jms/JmsMessageConsumer.java | 17 +-
.../java/org/apache/qpid/jms/JmsSession.java | 6 +-
.../apache/qpid/jms/message/JmsMapMessage.java | 24 +-
.../org/apache/qpid/jms/message/JmsMessage.java | 120 +--
.../message/JmsMessagePropertyIntercepter.java | 495 ++++++---
.../qpid/jms/message/JmsMessageSupport.java | 10 +
.../jms/message/facade/JmsMessageFacade.java | 1 +
.../org/apache/qpid/jms/provider/Provider.java | 10 +-
.../qpid/jms/provider/ProviderConstants.java | 24 +-
.../qpid/jms/provider/ProviderWrapper.java | 4 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 47 +-
.../qpid/jms/provider/amqp/AmqpProvider.java | 4 +-
.../qpid/jms/provider/amqp/AmqpSession.java | 8 +-
.../qpid/jms/provider/amqp/AmqpSupport.java | 10 +-
.../jms/provider/failover/FailoverProvider.java | 4 +-
.../AmqpAcknowledgementsIntegrationTest.java | 206 ++++
.../JmsMessagePropertyIntercepterTest.java | 1018 +++++++++++++-----
.../apache/qpid/jms/message/JmsMessageTest.java | 13 +-
.../failover/FailoverProviderClosedTest.java | 4 +-
.../qpid/jms/provider/mock/MockProvider.java | 2 +-
23 files changed, 1525 insertions(+), 602 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
new file mode 100644
index 0000000..4923124
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsAcknowledgeCallback.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.apache.qpid.jms.message.JmsMessageSupport.ACCEPTED;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsMessageSupport;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+public class JmsAcknowledgeCallback {
+
+ private final JmsSession session;
+ private int ackType;
+
+ public JmsAcknowledgeCallback(JmsSession session) {
+ this.session = session;
+ }
+
+ public void acknowledge() throws JMSException {
+ if (session.isClosed()) {
+ throw new javax.jms.IllegalStateException("Session closed.");
+ }
+
+ session.acknowledge(lookupAckTypeForDisposition(getAckType()));
+ }
+
+ private ACK_TYPE lookupAckTypeForDisposition(int dispositionType) throws JMSException {
+ switch (dispositionType) {
+ case JmsMessageSupport.ACCEPTED:
+ return ACK_TYPE.ACCEPTED;
+ case JmsMessageSupport.REJECTED:
+ return ACK_TYPE.REJECTED;
+ case JmsMessageSupport.RELEASED:
+ return ACK_TYPE.RELEASED;
+ case JmsMessageSupport.MODIFIED_FAILED:
+ return ACK_TYPE.MODIFIED_FAILED;
+ case JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE:
+ return ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE;
+ default:
+ throw new JMSException("Unable to determine ack type for disposition: " + dispositionType);
+ }
+ }
+
+ /**
+ * @return true if the acknowledgement type was updated.
+ */
+ public boolean isAckTypeSet() {
+ return ackType > 0;
+ }
+
+ /**
+ * Clears any previous setting and restores defaults.
+ */
+ public void clearAckType() {
+ ackType = 0;
+ }
+
+ /**
+ * @return the ackType that has been configured or the default if none has been set.
+ */
+ public int getAckType() {
+ return ackType <= 0 ? ACCEPTED : ackType;
+ }
+
+ /**
+ * Sets the acknowledgement type that will be used.
+ *
+ * @param ackType
+ * the ackType to apply to the session acknowledge.
+ */
+ public void setAckType(int ackType) {
+ this.ackType = ackType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 1686625..3ee98b5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -660,16 +660,16 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
}
}
- void acknowledge(JmsSessionId sessionId) throws JMSException {
- acknowledge(sessionId, null);
+ void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType) throws JMSException {
+ acknowledge(sessionId, ackType, null);
}
- void acknowledge(JmsSessionId sessionId, ProviderSynchronization synchronization) throws JMSException {
+ void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, ProviderSynchronization synchronization) throws JMSException {
checkClosedOrFailed();
try {
ProviderFuture request = new ProviderFuture(synchronization);
- provider.acknowledge(sessionId, request);
+ provider.acknowledge(sessionId, ackType, request);
request.sync();
} catch (Exception ioe) {
throw JmsExceptionSupport.create(ioe);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index 40a4ad1..6a86033 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -88,7 +88,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
@Override
public void acknowledge(JmsConnection connection, JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
// Consumed or delivered messages fall into a transaction otherwise just pass it in.
- if (ackType == ACK_TYPE.CONSUMED || ackType == ACK_TYPE.DELIVERED) {
+ if (ackType == ACK_TYPE.ACCEPTED || ackType == ACK_TYPE.DELIVERED) {
lock.readLock().lock();
try {
connection.acknowledge(envelope, ackType, new ProviderSynchronization() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 8d75093..06f036c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -17,7 +17,6 @@
package org.apache.qpid.jms;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -369,7 +368,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
private JmsInboundMessageDispatch doAckConsumed(final JmsInboundMessageDispatch envelope) throws JMSException {
checkClosed();
try {
- session.acknowledge(envelope, ACK_TYPE.CONSUMED);
+ session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
} catch (JMSException ex) {
session.onException(ex);
throw ex;
@@ -398,7 +397,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
private void doAckUndeliverable(final JmsInboundMessageDispatch envelope) throws JMSException {
try {
- session.acknowledge(envelope, ACK_TYPE.POISONED);
+ session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
} catch (JMSException ex) {
session.onException(ex);
throw ex;
@@ -426,17 +425,7 @@ public class JmsMessageConsumer implements MessageConsumer, JmsMessageAvailableC
lock.lock();
try {
if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
- envelope.getMessage().setAcknowledgeCallback(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- if (session.isClosed()) {
- throw new javax.jms.IllegalStateException("Session closed.");
- }
- session.acknowledge();
- envelope.getMessage().setAcknowledgeCallback(null);
- return null;
- }
- });
+ envelope.getMessage().setAcknowledgeCallback(new JmsAcknowledgeCallback(session));
}
if (envelope.isEnqueueFirst()) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index c0a361b..1350c3c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -693,14 +693,16 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
* Acknowledge all previously delivered messages in this Session as consumed. This
* method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode.
*
+ * @param ackType
+ * The type of acknowledgement being done.
* @throws JMSException if an error occurs while the acknowledge is processed.
*/
- void acknowledge() throws JMSException {
+ void acknowledge(ACK_TYPE ackType) throws JMSException {
if (isTransacted()) {
throw new IllegalStateException("Session acknowledge called inside a transacted Session");
}
- this.connection.acknowledge(sessionInfo.getId());
+ this.connection.acknowledge(sessionInfo.getId(), ackType);
}
public boolean isClosed() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
index e5184cd..44327e4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
@@ -275,11 +275,7 @@ public class JmsMapMessage extends JmsMessage implements MapMessage {
@Override
public void setObject(String name, Object value) throws JMSException {
- // byte[] not allowed on properties so cover that here.
- if (!(value instanceof byte[])) {
- checkValidObject(value);
- }
-
+ checkValidObject(value);
put(name, value);
}
@@ -316,4 +312,22 @@ public class JmsMapMessage extends JmsMessage implements MapMessage {
throw new IllegalArgumentException("Map key name must not be the empty string");
}
}
+
+ private void checkValidObject(Object value) throws MessageFormatException {
+ boolean valid = value instanceof Boolean ||
+ value instanceof Byte ||
+ value instanceof Short ||
+ value instanceof Integer ||
+ value instanceof Long ||
+ value instanceof Float ||
+ value instanceof Double ||
+ value instanceof Character ||
+ value instanceof String ||
+ value instanceof byte[] ||
+ value == null;
+
+ if (!valid) {
+ throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index fb788fe..f2123c7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -20,7 +20,6 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.Callable;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -29,6 +28,7 @@ import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
+import org.apache.qpid.jms.JmsAcknowledgeCallback;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
@@ -37,7 +37,7 @@ import org.apache.qpid.jms.util.TypeConversionSupport;
public class JmsMessage implements javax.jms.Message {
private static final String ID_PREFIX = "ID:";
- protected transient Callable<Void> acknowledgeCallback;
+ protected transient JmsAcknowledgeCallback acknowledgeCallback;
protected transient JmsConnection connection;
protected final JmsMessageFacade facade;
@@ -94,7 +94,8 @@ public class JmsMessage implements javax.jms.Message {
public void acknowledge() throws JMSException {
if (acknowledgeCallback != null) {
try {
- acknowledgeCallback.call();
+ acknowledgeCallback.acknowledge();
+ acknowledgeCallback = null;
} catch (Throwable e) {
throw JmsExceptionSupport.create(e);
}
@@ -248,38 +249,17 @@ public class JmsMessage implements javax.jms.Message {
@Override
public void clearProperties() throws JMSException {
- JmsMessagePropertyIntercepter.clearProperties(facade, true);
- setReadOnlyProperties(false);
+ JmsMessagePropertyIntercepter.clearProperties(this, true);
}
@Override
public boolean propertyExists(String name) throws JMSException {
- try {
- checkPropertyNameIsValid(name);
- } catch (IllegalArgumentException iae) {
- return false;
- }
-
- return JmsMessagePropertyIntercepter.propertyExists(facade, name);
+ return JmsMessagePropertyIntercepter.propertyExists(this, name);
}
@Override
public Enumeration<?> getPropertyNames() throws JMSException {
- Set<String> result = new HashSet<String>();
-
- Set<String> propertyNames = JmsMessagePropertyIntercepter.getPropertyNames(facade, true);
- for (String name : propertyNames) {
- try {
- checkPropertyNameIsValid(name);
- } catch (IllegalArgumentException iae) {
- // Don't add the name
- continue;
- }
-
- result.add(name);
- }
-
- return Collections.enumeration(result);
+ return Collections.enumeration(JmsMessagePropertyIntercepter.getPropertyNames(this, true));
}
/**
@@ -292,39 +272,18 @@ public class JmsMessage implements javax.jms.Message {
*/
public Enumeration<?> getAllPropertyNames() throws JMSException {
Set<String> result = new HashSet<String>();
- result.addAll(JmsMessagePropertyIntercepter.getAllPropertyNames(facade));
+ result.addAll(JmsMessagePropertyIntercepter.getAllPropertyNames(this));
return Collections.enumeration(result);
}
@Override
public void setObjectProperty(String name, Object value) throws JMSException {
- checkReadOnlyProperties();
- checkPropertyNameIsValid(name);
- checkValidObject(value);
- JmsMessagePropertyIntercepter.setProperty(facade, name, value);
- }
-
- protected void checkValidObject(Object value) throws MessageFormatException {
- boolean valid = value instanceof Boolean ||
- value instanceof Byte ||
- value instanceof Short ||
- value instanceof Integer ||
- value instanceof Long ||
- value instanceof Float ||
- value instanceof Double ||
- value instanceof Character ||
- value instanceof String ||
- value == null;
-
- if (!valid) {
- throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
- }
+ JmsMessagePropertyIntercepter.setProperty(this, name, value);
}
@Override
public Object getObjectProperty(String name) throws JMSException {
- checkPropertyNameIsValid(name);
- return JmsMessagePropertyIntercepter.getProperty(facade, name);
+ return JmsMessagePropertyIntercepter.getProperty(this, name);
}
@Override
@@ -471,12 +430,12 @@ public class JmsMessage implements javax.jms.Message {
setObjectProperty(name, value);
}
- public Callable<Void> getAcknowledgeCallback() {
+ public JmsAcknowledgeCallback getAcknowledgeCallback() {
return acknowledgeCallback;
}
- public void setAcknowledgeCallback(Callable<Void> acknowledgeCallback) {
- this.acknowledgeCallback = acknowledgeCallback;
+ public void setAcknowledgeCallback(JmsAcknowledgeCallback jmsAcknowledgeCallback) {
+ this.acknowledgeCallback = jmsAcknowledgeCallback;
}
/**
@@ -530,6 +489,8 @@ public class JmsMessage implements javax.jms.Message {
return "JmsMessage { " + facade + " }";
}
+ //----- State validation methods -----------------------------------------//
+
protected void checkReadOnlyProperties() throws MessageNotWriteableException {
if (readOnlyProperties) {
throw new MessageNotWriteableException("Message properties are read-only");
@@ -547,55 +508,4 @@ public class JmsMessage implements javax.jms.Message {
throw new MessageNotReadableException("Message body is write-only");
}
}
-
- private void checkPropertyNameIsValid(String propertyName) throws IllegalArgumentException {
- if (propertyName == null) {
- throw new IllegalArgumentException("Property name must not be null");
- } else if (propertyName.length() == 0) {
- throw new IllegalArgumentException("Property name must not be the empty string");
- }
-
- if (isValidatePropertyNames()) {
- checkIdentifierLetterAndDigitRequirements(propertyName);
- checkIdentifierIsntNullTrueFalse(propertyName);
- checkIdentifierIsntLogicOperator(propertyName);
- }
- }
-
- private void checkIdentifierIsntLogicOperator(String identifier) {
- // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
- if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
- "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
- "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
-
- throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
- }
- }
-
- private void checkIdentifierIsntNullTrueFalse(String identifier) {
- // Identifiers cannot be the names NULL, TRUE, and FALSE.
- if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
- throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
- }
- }
-
- private void checkIdentifierLetterAndDigitRequirements(String identifier) {
- // An identifier is an unlimited-length sequence of letters and digits, the first of
- // which must be a letter. A letter is any character for which the method
- // Character.isJavaLetter returns true. This includes '_' and '$'. A letter or digit
- // is any character for which the method Character.isJavaLetterOrDigit returns true.
- char startChar = identifier.charAt(0);
- if (!(Character.isJavaIdentifierStart(startChar))) {
- throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
- }
-
- // JMS part character
- int length = identifier.length();
- for (int i = 1; i < length; i++) {
- char ch = identifier.charAt(i);
- if (!(Character.isJavaIdentifierPart(ch))) {
- throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index 4dd3c13..a502760 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -20,6 +20,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_DELIVERY_COUNT;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPID;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID;
+import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION;
@@ -41,9 +42,9 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageFormatException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
-import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.util.TypeConversionSupport;
/**
@@ -55,6 +56,7 @@ public class JmsMessagePropertyIntercepter {
private static final Map<String, PropertyIntercepter> PROPERTY_INTERCEPTERS =
new HashMap<String, PropertyIntercepter>();
private static final Set<String> STANDARD_HEADERS = new HashSet<String>();
+ private static final Set<String> VENDOR_PROPERTIES = new HashSet<String>();
/**
* Interface for a Property intercepter object used to write JMS style
@@ -75,7 +77,7 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs while accessing the property
*/
- Object getProperty(JmsMessageFacade message) throws JMSException;
+ Object getProperty(JmsMessage message) throws JMSException;
/**
* Called when the names property is assigned from an JMS Message object.
@@ -87,7 +89,7 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs writing the property.
*/
- void setProperty(JmsMessageFacade message, Object value) throws JMSException;
+ void setProperty(JmsMessage message, Object value) throws JMSException;
/**
* Indicates if the intercepted property has a value currently assigned.
@@ -97,7 +99,7 @@ public class JmsMessagePropertyIntercepter {
*
* @return true if the intercepted property has a value assigned to it.
*/
- boolean propertyExists(JmsMessageFacade message);
+ boolean propertyExists(JmsMessage message);
/**
* Request that the intercepted property be cleared. For properties that
@@ -109,7 +111,16 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs clearing the property.
*/
- void clearProperty(JmsMessageFacade message) throws JMSException;
+ void clearProperty(JmsMessage message) throws JMSException;
+
+ /**
+ * Return true if the intercepter can bypass the read-only state of a Message
+ * and its properties.
+ *
+ * @return true if the intercepter is immune to read-only state checks.
+ */
+ boolean isAlwaysWritable();
+
}
static {
@@ -124,15 +135,17 @@ public class JmsMessagePropertyIntercepter {
STANDARD_HEADERS.add(JMS_EXPIRATION);
STANDARD_HEADERS.add(JMS_PRIORITY);
+ VENDOR_PROPERTIES.add(JMS_AMQP_ACK_TYPE);
+
PROPERTY_INTERCEPTERS.put(JMS_DESTINATION, new PropertyIntercepter() {
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
throw new JMSException("Cannot set JMS Destination as a property, use setJMSDestination() instead");
}
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- Destination dest = message.getDestination();
+ public Object getProperty(JmsMessage message) throws JMSException {
+ Destination dest = message.getFacade().getDestination();
if (dest == null) {
return null;
}
@@ -140,72 +153,87 @@ public class JmsMessagePropertyIntercepter {
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getDestination() != null;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getDestination() != null;
+ }
+
+ @Override
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setDestination(null);
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setDestination(null);
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_REPLYTO, new PropertyIntercepter() {
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
throw new JMSException("Cannot set JMS ReplyTo as a property, use setJMSReplTo() instead");
}
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- if (message.getReplyTo() == null) {
+ public Object getProperty(JmsMessage message) throws JMSException {
+ if (message.getFacade().getReplyTo() == null) {
return null;
}
- return message.getReplyTo().toString();
+ return message.getFacade().getReplyTo().toString();
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getReplyTo() != null;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getReplyTo() != null;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setReplyTo(null);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setReplyTo(null);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_TYPE, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return message.getType();
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return message.getFacade().getType();
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
String rc = (String) TypeConversionSupport.convert(value, String.class);
if (rc == null) {
throw new JMSException("Property JMSType cannot be set from a " + value.getClass().getName() + ".");
}
- message.setType(rc);
+ message.getFacade().setType(rc);
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getType() != null;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getType() != null;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setType(null);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setType(null);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_DELIVERY_MODE, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return message.isPersistent() ? "PERSISTENT" : "NON_PERSISTENT";
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return message.getFacade().isPersistent() ? "PERSISTENT" : "NON_PERSISTENT";
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Integer rc = null;
try {
rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
@@ -227,258 +255,308 @@ public class JmsMessagePropertyIntercepter {
if (bool == null) {
throw new JMSException("Property JMSDeliveryMode cannot be set from a " + value.getClass().getName() + ".");
} else {
- message.setPersistent(bool.booleanValue());
+ message.getFacade().setPersistent(bool.booleanValue());
}
} else {
- message.setPersistent(rc == DeliveryMode.PERSISTENT);
+ message.getFacade().setPersistent(rc == DeliveryMode.PERSISTENT);
}
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
+ public boolean propertyExists(JmsMessage message) {
return true;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setPersistent(true); // Default value
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setPersistent(true); // Default value
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_PRIORITY, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return Integer.valueOf(message.getPriority());
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return Integer.valueOf(message.getFacade().getPriority());
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
if (rc == null) {
throw new JMSException("Property JMSPriority cannot be set from a " + value.getClass().getName() + ".");
}
- message.setPriority(rc.byteValue());
+ message.getFacade().setPriority(rc.byteValue());
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
+ public boolean propertyExists(JmsMessage message) {
return true;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setPriority(Message.DEFAULT_PRIORITY);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setPriority(Message.DEFAULT_PRIORITY);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_MESSAGEID, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- if (message.getMessageId() == null) {
+ public Object getProperty(JmsMessage message) throws JMSException {
+ if (message.getFacade().getMessageId() == null) {
return null;
}
- return message.getMessageId();
+ return message.getFacade().getMessageId();
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
String rc = (String) TypeConversionSupport.convert(value, String.class);
if (rc == null) {
throw new JMSException("Property JMSMessageID cannot be set from a " + value.getClass().getName() + ".");
}
- message.setMessageId(rc);
+ message.getFacade().setMessageId(rc);
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getMessageId() != null;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getMessageId() != null;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setMessageId(null);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setMessageId(null);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_TIMESTAMP, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return Long.valueOf(message.getTimestamp());
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return Long.valueOf(message.getFacade().getTimestamp());
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
if (rc == null) {
throw new JMSException("Property JMSTimestamp cannot be set from a " + value.getClass().getName() + ".");
}
- message.setTimestamp(rc.longValue());
+ message.getFacade().setTimestamp(rc.longValue());
+ }
+
+ @Override
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getTimestamp() > 0;
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getTimestamp() > 0;
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setTimestamp(0);
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setTimestamp(0);
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_CORRELATIONID, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return message.getCorrelationId();
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return message.getFacade().getCorrelationId();
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
String rc = (String) TypeConversionSupport.convert(value, String.class);
if (rc == null) {
throw new JMSException("Property JMSCorrelationID cannot be set from a " + value.getClass().getName() + ".");
}
- message.setCorrelationId(rc);
+ message.getFacade().setCorrelationId(rc);
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getCorrelationId() != null;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getCorrelationId() != null;
}
@Override
- public void clearProperty(JmsMessageFacade message) throws JMSException {
- message.setCorrelationId(null);
+ public void clearProperty(JmsMessage message) throws JMSException {
+ message.getFacade().setCorrelationId(null);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_EXPIRATION, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return Long.valueOf(message.getExpiration());
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return Long.valueOf(message.getFacade().getExpiration());
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
if (rc == null) {
throw new JMSException("Property JMSExpiration cannot be set from a " + value.getClass().getName() + ".");
}
- message.setExpiration(rc.longValue());
+ message.getFacade().setExpiration(rc.longValue());
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getExpiration() > 0;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getExpiration() > 0;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setExpiration(0);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setExpiration(0);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMS_REDELIVERED, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return Boolean.valueOf(message.isRedelivered());
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return Boolean.valueOf(message.getFacade().isRedelivered());
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
if (rc == null) {
throw new JMSException("Property JMSRedelivered cannot be set from a " + value.getClass().getName() + ".");
}
- message.setRedelivered(rc.booleanValue());
+ message.getFacade().setRedelivered(rc.booleanValue());
+ }
+
+ @Override
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().isRedelivered();
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.isRedelivered();
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setRedelivered(false);
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setRedelivered(false);
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMSX_DELIVERY_COUNT, new PropertyIntercepter() {
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
if (rc == null) {
throw new JMSException("Property JMSXDeliveryCount cannot be set from a " + value.getClass().getName() + ".");
}
- message.setDeliveryCount(rc.intValue());
+ message.getFacade().setDeliveryCount(rc.intValue());
}
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return Integer.valueOf(message.getDeliveryCount());
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return Integer.valueOf(message.getFacade().getDeliveryCount());
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
+ public boolean propertyExists(JmsMessage message) {
return true;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setDeliveryCount(1);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setDeliveryCount(1);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMSX_GROUPID, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return message.getGroupId();
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return message.getFacade().getGroupId();
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
String rc = (String) TypeConversionSupport.convert(value, String.class);
if (rc == null) {
throw new JMSException("Property JMSXGroupID cannot be set from a " + value.getClass().getName() + ".");
}
- message.setGroupId(rc);
+ message.getFacade().setGroupId(rc);
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getGroupId() != null;
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getGroupId() != null;
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setGroupId(null);
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setGroupId(null);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMSX_GROUPSEQ, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- return message.getGroupSequence();
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return message.getFacade().getGroupSequence();
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
if (rc == null) {
throw new JMSException("Property JMSXGroupSeq cannot be set from a " + value.getClass().getName() + ".");
}
- message.setGroupSequence(rc.intValue());
+ message.getFacade().setGroupSequence(rc.intValue());
+ }
+
+ @Override
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getGroupSequence() != 0;
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getGroupSequence() != 0;
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setGroupSequence(0);
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setGroupSequence(0);
+ public boolean isAlwaysWritable() {
+ return false;
}
});
PROPERTY_INTERCEPTERS.put(JMSX_USERID, new PropertyIntercepter() {
@Override
- public Object getProperty(JmsMessageFacade message) throws JMSException {
- Object userId = message.getUserId();
+ public Object getProperty(JmsMessage message) throws JMSException {
+ Object userId = message.getFacade().getUserId();
if (userId == null) {
try {
- userId = message.getProperty("JMSXUserID");
+ userId = message.getFacade().getProperty("JMSXUserID");
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
@@ -488,21 +566,75 @@ public class JmsMessagePropertyIntercepter {
}
@Override
- public void setProperty(JmsMessageFacade message, Object value) throws JMSException {
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
if (!(value instanceof String)) {
throw new JMSException("Property JMSXUserID cannot be set from a " + value.getClass().getName() + ".");
}
- message.setUserId((String) value);
+ message.getFacade().setUserId((String) value);
+ }
+
+ @Override
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getUserId() != null;
}
@Override
- public boolean propertyExists(JmsMessageFacade message) {
- return message.getUserId() != null;
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setUserId(null);
}
@Override
- public void clearProperty(JmsMessageFacade message) {
- message.setUserId(null);
+ public boolean isAlwaysWritable() {
+ return false;
+ }
+ });
+ PROPERTY_INTERCEPTERS.put(JMS_AMQP_ACK_TYPE, new PropertyIntercepter() {
+ @Override
+ public Object getProperty(JmsMessage message) throws JMSException {
+ Object ackType = null;
+
+ if (message.getAcknowledgeCallback() != null &&
+ message.getAcknowledgeCallback().isAckTypeSet()) {
+
+ ackType = message.getAcknowledgeCallback().getAckType();
+ }
+
+ return ackType;
+ }
+
+ @Override
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
+ if (message.getAcknowledgeCallback() == null) {
+ throw new JMSException("Session Acknoewledgement Mode does not allow setting: " + JMS_AMQP_ACK_TYPE);
+ }
+
+ Integer ackType = (Integer) TypeConversionSupport.convert(value, Integer.class);
+ if (ackType == null) {
+ throw new JMSException("Property " + JMS_AMQP_ACK_TYPE + " cannot be set from a " + value.getClass().getName() + ".");
+ }
+
+ message.getAcknowledgeCallback().setAckType(ackType);
+ }
+
+ @Override
+ public boolean propertyExists(JmsMessage message) {
+ if (message.getAcknowledgeCallback() != null) {
+ return message.getAcknowledgeCallback().isAckTypeSet();
+ }
+
+ return false;
+ }
+
+ @Override
+ public void clearProperty(JmsMessage message) throws JMSException {
+ if (message.getAcknowledgeCallback() != null) {
+ message.getAcknowledgeCallback().clearAckType();
+ }
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return true;
}
});
}
@@ -513,7 +645,7 @@ public class JmsMessagePropertyIntercepter {
* method.
*
* @param message
- * the JmsMessageFacade instance to read from
+ * the JmsMessage instance to read from
* @param name
* the property name that is being requested.
*
@@ -521,14 +653,16 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs while reading the defined property.
*/
- public static Object getProperty(JmsMessageFacade message, String name) throws JMSException {
+ public static Object getProperty(JmsMessage message, String name) throws JMSException {
Object value = null;
+ checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+
PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name);
if (jmsPropertyExpression != null) {
value = jmsPropertyExpression.getProperty(message);
} else {
- value = message.getProperty(name);
+ value = message.getFacade().getProperty(name);
}
return value;
@@ -540,7 +674,7 @@ public class JmsMessagePropertyIntercepter {
* method.
*
* @param message
- * the JmsMessageFacade instance to write to.
+ * the JmsMessage instance to write to.
* @param name
* the property name that is being written.
* @param value
@@ -548,12 +682,19 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs while writing the defined property.
*/
- public static void setProperty(JmsMessageFacade message, String name, Object value) throws JMSException {
+ public static void setProperty(JmsMessage message, String name, Object value) throws JMSException {
PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name);
+
+ if (jmsPropertyExpression == null || !jmsPropertyExpression.isAlwaysWritable()) {
+ message.checkReadOnlyProperties();
+ }
+ checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+ checkValidObject(value);
+
if (jmsPropertyExpression != null) {
jmsPropertyExpression.setProperty(message, value);
} else {
- message.setProperty(name, value);
+ message.getFacade().setProperty(name, value);
}
}
@@ -561,7 +702,7 @@ public class JmsMessagePropertyIntercepter {
* Static inspection method to determine if a named property exists for a given message.
*
* @param message
- * the JmsMessageFacade instance to read from
+ * the JmsMessage instance to read from
* @param name
* the property name that is being inspected.
*
@@ -569,12 +710,18 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs while validating the defined property.
*/
- public static boolean propertyExists(JmsMessageFacade message, String name) throws JMSException {
+ public static boolean propertyExists(JmsMessage message, String name) throws JMSException {
+ try {
+ checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+ } catch (IllegalArgumentException iae) {
+ return false;
+ }
+
PropertyIntercepter jmsPropertyExpression = PROPERTY_INTERCEPTERS.get(name);
if (jmsPropertyExpression != null) {
return jmsPropertyExpression.propertyExists(message);
} else {
- return message.propertyExists(name);
+ return message.getFacade().propertyExists(name);
}
}
@@ -584,13 +731,13 @@ public class JmsMessagePropertyIntercepter {
* message facade to clear any message properties that might have been set.
*
* @param message
- * the JmsMessageFacade instance to read from
+ * the JmsMessage instance to read from
* @param excludeStandardJMSHeaders
* whether the standard JMS header names should be excluded from the returned set
*
* @throws JMSException if an error occurs while validating the defined property.
*/
- public static void clearProperties(JmsMessageFacade message, boolean excludeStandardJMSHeaders) throws JMSException {
+ public static void clearProperties(JmsMessage message, boolean excludeStandardJMSHeaders) throws JMSException {
for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) {
if (excludeStandardJMSHeaders && STANDARD_HEADERS.contains(entry.getKey())) {
continue;
@@ -599,7 +746,8 @@ public class JmsMessagePropertyIntercepter {
entry.getValue().clearProperty(message);
}
- message.clearProperties();
+ message.getFacade().clearProperties();
+ message.setReadOnlyProperties(false);
}
/**
@@ -607,15 +755,15 @@ public class JmsMessagePropertyIntercepter {
* string key value is inserted into an Set and returned.
*
* @param message
- * the JmsMessageFacade instance to read property names from.
+ * the JmsMessage instance to read property names from.
*
* @return a {@code Set<String>} containing the names of all intercepted properties.
*
* @throws JMSException if an error occurs while gathering the message property names.
*/
- public static Set<String> getAllPropertyNames(JmsMessageFacade message) throws JMSException {
+ public static Set<String> getAllPropertyNames(JmsMessage message) throws JMSException {
Set<String> names = new HashSet<String>(PROPERTY_INTERCEPTERS.keySet());
- names.addAll(message.getPropertyNames());
+ names.addAll(message.getFacade().getPropertyNames());
return names;
}
@@ -627,7 +775,7 @@ public class JmsMessagePropertyIntercepter {
* will be returned if there are no matching properties.
*
* @param message
- * the JmsMessageFacade instance to read from
+ * the JmsMessage instance to read from
* @param excludeStandardJMSHeaders
* whether the standard JMS header names should be excluded from the returned set
*
@@ -635,7 +783,7 @@ public class JmsMessagePropertyIntercepter {
*
* @throws JMSException if an error occurs while gathering the message property names.
*/
- public static Set<String> getPropertyNames(JmsMessageFacade message, boolean excludeStandardJMSHeaders) throws JMSException {
+ public static Set<String> getPropertyNames(JmsMessage message, boolean excludeStandardJMSHeaders) throws JMSException {
Set<String> names = new HashSet<String>();
for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) {
if (excludeStandardJMSHeaders && STANDARD_HEADERS.contains(entry.getKey())) {
@@ -647,8 +795,87 @@ public class JmsMessagePropertyIntercepter {
}
}
- names.addAll(message.getPropertyNames());
+ for (String name : message.getFacade().getPropertyNames()) {
+ try {
+ checkPropertyNameIsValid(name, message.isValidatePropertyNames());
+ } catch (IllegalArgumentException iae) {
+ // Don't add the name
+ continue;
+ }
+
+ names.add(name);
+ }
return names;
}
+
+ //----- Property Validation Methods --------------------------------------//
+
+ private static void checkPropertyNameIsValid(String propertyName, boolean validateNames) throws IllegalArgumentException {
+ if (propertyName == null) {
+ throw new IllegalArgumentException("Property name must not be null");
+ } else if (propertyName.length() == 0) {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ if (validateNames) {
+ checkIdentifierLetterAndDigitRequirements(propertyName);
+ checkIdentifierIsntNullTrueFalse(propertyName);
+ checkIdentifierIsntLogicOperator(propertyName);
+ }
+ }
+
+ private static void checkIdentifierIsntLogicOperator(String identifier) {
+ // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
+ if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
+ "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
+ "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
+
+ throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+ }
+ }
+
+ private static void checkIdentifierIsntNullTrueFalse(String identifier) {
+ // Identifiers cannot be the names NULL, TRUE, and FALSE.
+ if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
+ throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+ }
+ }
+
+ private static void checkIdentifierLetterAndDigitRequirements(String identifier) {
+ // An identifier is an unlimited-length sequence of letters and digits, the first of
+ // which must be a letter. A letter is any character for which the method
+ // Character.isJavaLetter returns true. This includes '_' and '$'. A letter or digit
+ // is any character for which the method Character.isJavaLetterOrDigit returns true.
+ char startChar = identifier.charAt(0);
+ if (!(Character.isJavaIdentifierStart(startChar))) {
+ throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
+ }
+
+ // JMS part character
+ int length = identifier.length();
+ for (int i = 1; i < length; i++) {
+ char ch = identifier.charAt(i);
+ if (!(Character.isJavaIdentifierPart(ch))) {
+ throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
+ }
+ }
+ }
+
+ private static void checkValidObject(Object value) throws MessageFormatException {
+ boolean valid = value instanceof Boolean ||
+ value instanceof Byte ||
+ value instanceof Short ||
+ value instanceof Integer ||
+ value instanceof Long ||
+ value instanceof Float ||
+ value instanceof Double ||
+ value instanceof Character ||
+ value instanceof String ||
+ value == null;
+
+ if (!valid) {
+ throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
index 29a70b4..31f2120 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
@@ -37,4 +37,14 @@ public class JmsMessageSupport {
public static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
public static final String JMSX_USERID = "JMSXUserID";
+ public static final String JMS_AMQP_ACK_TYPE = "JMS_AMQP_ACK_TYPE";
+
+ // TODO: advise not using these constants, since doing so wont be portable?
+ // Make them package private so they can't be used to begin with?
+ public static final int ACCEPTED = 1;
+ public static final int REJECTED = 2;
+ public static final int RELEASED = 3;
+ public static final int MODIFIED_FAILED = 4;
+ public static final int MODIFIED_FAILED_UNDELIVERABLE = 5;
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index 15bde3c..9a972c1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -406,4 +406,5 @@ public interface JmsMessageFacade {
* The message ID to set on this message, or null to clear.
*/
void setProviderMessageIdObject(Object messageId);
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index ad5993f..d430abf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -187,29 +187,31 @@ public interface Provider {
* Called to acknowledge all messages that have been delivered in a given session.
*
* This method is typically used by a Session that is configured for client acknowledge
- * mode. The acknowledgment should only be applied to Messages that have been marked
+ * mode. The acknowledgement should only be applied to Messages that have been marked
* as delivered and not those still awaiting dispatch.
*
* @param sessionId
* the ID of the Session whose delivered messages should be acknowledged.
+ * @param ackType
+ * The type of acknowledgement being done.
* @param request
* The request object that should be signaled when this operation completes.
*
* @throws IOException if an error occurs or the Provider is already closed.
* @throws JMSException if an error occurs due to JMS violation such as unmatched ack.
*/
- void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+ void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException;
/**
* Called to acknowledge a JmsMessage has been delivered, consumed, re-delivered...etc.
*
- * The provider should perform an acknowledgment for the message based on the configured
+ * The provider should perform an acknowledgement for the message based on the configured
* mode of the consumer that it was dispatched to and the capabilities of the protocol.
*
* @param envelope
* The message dispatch envelope containing the Message delivery information.
* @param ackType
- * The type of acknowledgment being done.
+ * The type of acknowledgement being done.
* @param request
* The request object that should be signaled when this operation completes.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
index 9071ba8..1742a13 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -24,20 +24,14 @@ public final class ProviderConstants {
private ProviderConstants() {}
public enum ACK_TYPE {
- DELIVERED(0),
- CONSUMED(1),
- POISONED(2),
- EXPIRED(3),
- RELEASED(4);
-
- private final int value;
-
- private ACK_TYPE(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
+ // Aligned with AMQP dispositions
+ ACCEPTED,
+ RELEASED,
+ REJECTED,
+ MODIFIED_FAILED,
+ MODIFIED_FAILED_UNDELIVERABLE,
+ // Conceptual
+ DELIVERED,
+ EXPIRED;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index c053c22..bc4073c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -98,8 +98,8 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
}
@Override
- public void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
- next.acknowledge(sessionId, request);
+ public void acknowledge(JmsSessionId sessionId, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
+ next.acknowledge(sessionId, ackType, request);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 877cd1b..1b61313 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -17,7 +17,8 @@
package org.apache.qpid.jms.provider.amqp;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
-import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_UNDELIVERABLE;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -167,13 +168,35 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
* Only messages that have already been acknowledged as delivered by the JMS
* framework will be in the delivered Map. This means that the link credit
* would already have been given for these so we just need to settle them.
+ *
+ * @param ackType the type of acknowledgement to perform
*/
- public void acknowledge() {
- LOG.trace("Session Acknowledge for consumer: {}", getResourceInfo().getId());
+ public void acknowledge(ACK_TYPE ackType) {
+ LOG.trace("Session Acknowledge for consumer {} with ack type {}", getResourceInfo().getId(), ackType);
for (Delivery delivery : delivered.values()) {
- delivery.disposition(Accepted.getInstance());
+ switch (ackType) {
+ case ACCEPTED:
+ delivery.disposition(Accepted.getInstance());
+ break;
+ case RELEASED:
+ delivery.disposition(Released.getInstance());
+ break;
+ case REJECTED:
+ delivery.disposition(REJECTED);
+ break;
+ case MODIFIED_FAILED:
+ delivery.disposition(MODIFIED_FAILED);
+ break;
+ case MODIFIED_FAILED_UNDELIVERABLE:
+ delivery.disposition(MODIFIED_FAILED_UNDELIVERABLE);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid acknowledgement type specified: " + ackType);
+ }
+
delivery.settle();
}
+
delivered.clear();
}
@@ -209,13 +232,13 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
setDefaultDeliveryState(delivery, MODIFIED_FAILED);
sendFlowIfNeeded();
- } else if (ackType.equals(ACK_TYPE.CONSUMED)) {
+ } else if (ackType.equals(ACK_TYPE.ACCEPTED)) {
// A Consumer may not always send a DELIVERED ack so we need to
// check to ensure we don't add too much credit to the link.
if (isPresettle() || delivered.remove(envelope) == null) {
sendFlowIfNeeded();
}
- LOG.debug("Consumed Ack of message: {}", envelope);
+ LOG.debug("Accepted Ack of message: {}", envelope);
if (!delivery.isSettled()) {
if (session.isTransacted() && !getResourceInfo().isBrowser()) {
@@ -238,10 +261,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
delivery.settle();
}
}
- } else if (ackType.equals(ACK_TYPE.POISONED)) {
- deliveryFailed(delivery);
+ } else if (ackType.equals(ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE)) {
+ deliveryFailedUndeliverable(delivery);
} else if (ackType.equals(ACK_TYPE.EXPIRED)) {
- deliveryFailed(delivery);
+ deliveryFailedUndeliverable(delivery);
} else if (ackType.equals(ACK_TYPE.RELEASED)) {
delivery.disposition(Released.getInstance());
delivery.settle();
@@ -399,7 +422,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
// In the future once the JMS mapping is complete we should be
// able to convert everything to some message even if its just
// a bytes messages as a fall back.
- deliveryFailed(incoming);
+ deliveryFailedUndeliverable(incoming);
return false;
}
@@ -482,8 +505,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
return "AmqpConsumer { " + getResourceInfo().getId() + " }";
}
- protected void deliveryFailed(Delivery incoming) {
- incoming.disposition(MODIFIED_UNDELIVERABLE);
+ protected void deliveryFailedUndeliverable(Delivery incoming) {
+ incoming.disposition(MODIFIED_FAILED_UNDELIVERABLE);
incoming.settle();
// TODO: this flows credit, which we might not want, e.g if
// a drain was issued to stop the link.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 5455de3..8e453f9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -494,7 +494,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
@Override
- public void acknowledge(final JmsSessionId sessionId, final AsyncResult request) throws IOException {
+ public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, final AsyncResult request) throws IOException {
checkClosed();
serializer.execute(new Runnable() {
@@ -503,7 +503,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
try {
checkClosed();
AmqpSession amqpSession = connection.getSession(sessionId);
- amqpSession.acknowledge();
+ amqpSession.acknowledge(ackType);
pumpToProtonTransport(request);
request.onSuccess();
} catch (Exception error) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 8961258..847d3f5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -31,6 +31,7 @@ import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.amqp.builders.AmqpConsumerBuilder;
import org.apache.qpid.jms.provider.amqp.builders.AmqpProducerBuilder;
import org.apache.qpid.proton.engine.Session;
@@ -61,10 +62,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i
/**
* Perform an acknowledge of all delivered messages for all consumers active in this
* Session.
+ *
+ * @param ackType
+ * controls the acknowledgement that is applied to each message.
*/
- public void acknowledge() {
+ public void acknowledge(final ACK_TYPE ackType) {
for (AmqpConsumer consumer : consumers.values()) {
- consumer.acknowledge();
+ consumer.acknowledge(ackType);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 12604b4..8522fa2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -29,6 +29,7 @@ import javax.jms.TransactionRolledBackException;
import org.apache.qpid.jms.provider.ProviderRedirectedException;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.TransactionErrors;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
@@ -62,8 +63,11 @@ public class AmqpSupport {
public static final Symbol COPY = Symbol.getSymbol("copy");
public static final Symbol JMS_NO_LOCAL_SYMBOL = Symbol.valueOf("no-local");
public static final Symbol JMS_SELECTOR_SYMBOL = Symbol.valueOf("jms-selector");
+
+ // Delivery states
+ public static final Rejected REJECTED = new Rejected();
public static final Modified MODIFIED_FAILED = new Modified();
- public static final Modified MODIFIED_UNDELIVERABLE = new Modified();
+ public static final Modified MODIFIED_FAILED_UNDELIVERABLE = new Modified();
// Temporary Destination constants
public static final Symbol DYNAMIC_NODE_LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
@@ -75,8 +79,8 @@ public class AmqpSupport {
static {
MODIFIED_FAILED.setDeliveryFailed(true);
- MODIFIED_UNDELIVERABLE.setDeliveryFailed(true);
- MODIFIED_UNDELIVERABLE.setUndeliverableHere(true);
+ MODIFIED_FAILED_UNDELIVERABLE.setDeliveryFailed(true);
+ MODIFIED_FAILED_UNDELIVERABLE.setUndeliverableHere(true);
}
//----- Utility Methods --------------------------------------------------//
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index f1676df..ebbc607 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -342,12 +342,12 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
}
@Override
- public void acknowledge(final JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
+ public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
checkClosed();
final FailoverRequest pending = new FailoverRequest(request) {
@Override
public void doTask() throws Exception {
- provider.acknowledge(sessionId, this);
+ provider.acknowledge(sessionId, ackType, this);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org