You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2011/03/03 22:27:27 UTC
svn commit: r1076800 - in
/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession_0_10.java messaging/address/AddressHelper.java
messaging/address/Link.java
Author: rajith
Date: Thu Mar 3 21:27:26 2011
New Revision: 1076800
URL: http://svn.apache.org/viewvc?rev=1076800&view=rev
Log:
QPID-2732
Only UNRELIABLE and AT_LEAST_ONCE is supported.
Currently the reliability mode is used only for determining the accept-mode.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1076800&r1=1076799&r2=1076800&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar 3 21:27:26 2011
@@ -47,6 +47,8 @@ import org.apache.qpid.client.message.AM
import org.apache.qpid.client.message.FieldTableSupport;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage_0_10;
+import org.apache.qpid.client.messaging.address.Link;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -601,10 +603,16 @@ public class AMQSession_0_10 extends AMQ
(Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
}
+ boolean acceptModeNone = false;
+
+ if (consumer.getDestination().getLink() != null)
+ {
+ acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
+ }
getQpidSession().messageSubscribe
(queueName.toString(), String.valueOf(tag),
- getAcknowledgeMode() == NO_ACKNOWLEDGE ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1076800&r1=1076799&r2=1076800&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Thu Mar 3 21:27:26 2011
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
import org.apache.qpid.client.messaging.address.Link.Subscription;
import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
import org.apache.qpid.client.messaging.address.Node.QueueNode;
@@ -262,7 +263,7 @@ public class AddressHelper
}
}
- public Link getLink()
+ public Link getLink() throws Exception
{
Link link = new Link();
link.setSubscription(new Subscription());
@@ -272,6 +273,25 @@ public class AddressHelper
: linkProps.getBoolean(DURABLE));
link.setName(linkProps.getString(NAME));
+ String reliability = linkProps.getString(RELIABILITY);
+ if ( reliability != null)
+ {
+ if (reliability.equalsIgnoreCase("unreliable"))
+ {
+ link.setReliability(Reliability.UNRELIABLE);
+ }
+ else if (reliability.equalsIgnoreCase("at-least-once"))
+ {
+ link.setReliability(Reliability.AT_LEAST_ONCE);
+ }
+ else
+ {
+ throw new Exception("The reliability mode '" +
+ reliability + "' is not yet supported");
+ }
+
+ }
+
if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
{
MapAccessor capacityProps = new MapAccessor(
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1076800&r1=1076799&r2=1076800&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Mar 3 21:27:26 2011
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.client.messaging.address;
+import static org.apache.qpid.client.messaging.address.Link.Reliability.UNRELIABLE;
+import static org.apache.qpid.client.messaging.address.Link.Reliability.AT_LEAST_ONCE;
+
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +32,8 @@ public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
+ public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
+
protected String name;
protected String _filter;
protected FilterType _filterType = FilterType.SUBJECT;
@@ -38,7 +43,18 @@ public class Link
protected int _producerCapacity = 0;
protected Node node;
protected Subscription subscription;
+ protected Reliability reliability = AT_LEAST_ONCE;
+ public Reliability getReliability()
+ {
+ return reliability;
+ }
+
+ public void setReliability(Reliability reliability)
+ {
+ this.reliability = reliability;
+ }
+
public Node getNode()
{
return node;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org