You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/30 09:29:52 UTC

svn commit: r1527467 - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ ...

Author: rgodfrey
Date: Mon Sep 30 07:29:51 2013
New Revision: 1527467

URL: http://svn.apache.org/r1527467
Log:
QPID-5177 : Set the default and supported outcomes on sending links in the amqp 1.0 JMS client

Modified:
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
    qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
    qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Mon Sep 30 07:29:51 2013
@@ -18,9 +18,8 @@
  */
 package org.apache.qpid.amqp_1_0.jms.impl;
 
-import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
-import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
-import org.apache.qpid.amqp_1_0.client.Sender;
+import org.apache.qpid.amqp_1_0.client.*;
+import org.apache.qpid.amqp_1_0.client.Session;
 import org.apache.qpid.amqp_1_0.jms.MessageProducer;
 import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
 import org.apache.qpid.amqp_1_0.jms.QueueSender;
@@ -32,9 +31,13 @@ import org.apache.qpid.amqp_1_0.type.Uns
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
+import javax.jms.Message;
 import java.util.UUID;
 import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
 import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.codec.AcceptedConstructor;
+import org.apache.qpid.amqp_1_0.type.messaging.codec.RejectedConstructor;
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
@@ -71,7 +74,14 @@ public class MessageProducerImpl impleme
         {
             try
             {
-                _sender = _session.getClientSession().createSender(_session.toAddress(_destination));
+                _sender = _session.getClientSession().createSender(_session.toAddress(_destination), new Session.SourceConfigurator()
+                {
+                    public void configureSource(final Source source)
+                    {
+                        source.setDefaultOutcome(new Accepted());
+                        source.setOutcomes(AcceptedConstructor.SYMBOL_CONSTRUCTOR, RejectedConstructor.SYMBOL_CONSTRUCTOR);
+                    }
+                });
             }
             catch (Sender.SenderCreationException e)
             {

Modified: qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java Mon Sep 30 07:29:51 2013
@@ -154,8 +154,7 @@ public class Demo extends Util
             responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
 
 
-
-            Sender s = session.createSender(queue, getWindowSize(), getMode());
+            Sender s = session.createSender(queue, getWindowSize(), getMode(), null);
 
 
             Properties properties = new Properties();

Modified: qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java Mon Sep 30 07:29:51 2013
@@ -21,7 +21,6 @@
 
 package org.apache.qpid.amqp_1_0.client;
 
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -143,8 +142,7 @@ public class Request extends Util
             responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true);
 
 
-
-            Sender s = session.createSender(queue, getWindowSize(), getMode());
+            Sender s = session.createSender(queue, getWindowSize(), getMode(), null);
 
             Transaction txn = null;
 

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java Mon Sep 30 07:29:51 2013
@@ -107,6 +107,16 @@ public class Sender implements DeliveryS
         this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled);
     }
 
+    protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source)
+    {
+
+    }
+
+    protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target)
+    {
+
+    }
+
     private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr)
     {
         org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source();
@@ -133,6 +143,8 @@ public class Sender implements DeliveryS
 
         _session = session;
         session.getConnection().checkNotClosed();
+        configureSource(source);
+        configureTarget(target);
         _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName,
                                                                     source, target, unsettled);
 
@@ -189,16 +201,15 @@ public class Sender implements DeliveryS
                 super.remoteDetached(endpoint, detach);
             }
         });
-        final org.apache.qpid.amqp_1_0.type.messaging.Source remoteSource =
-                (org.apache.qpid.amqp_1_0.type.messaging.Source) getSource();
-        _defaultOutcome = remoteSource.getDefaultOutcome();
+
+        _defaultOutcome = source.getDefaultOutcome();
         if(_defaultOutcome == null)
         {
-            if(remoteSource.getOutcomes() == null || remoteSource.getOutcomes().length == 0)
+            if(source.getOutcomes() == null || source.getOutcomes().length == 0)
             {
                 _defaultOutcome = new Accepted();
             }
-            else if(remoteSource.getOutcomes().length == 1)
+            else if(source.getOutcomes().length == 1)
             {
 
                 final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession()
@@ -206,7 +217,7 @@ public class Sender implements DeliveryS
                         .getDescribedTypeRegistry();
 
                 DescribedTypeConstructor constructor = describedTypeRegistry
-                        .getConstructor(remoteSource.getOutcomes()[0]);
+                        .getConstructor(source.getOutcomes()[0]);
                 if(constructor != null)
                 {
                     Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST);

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Sep 30 07:29:51 2013
@@ -63,15 +63,26 @@ public class Session
     public synchronized Sender createSender(final String targetName)
             throws Sender.SenderCreationException, ConnectionClosedException
     {
-        return createSender(targetName, false);
+
+        final String sourceName = UUID.randomUUID().toString();
+        return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false);
+
     }
 
-    public synchronized Sender createSender(final String targetName, boolean synchronous)
+
+    public synchronized Sender createSender(final String targetName, final SourceConfigurator configurator)
             throws Sender.SenderCreationException, ConnectionClosedException
     {
 
         final String sourceName = UUID.randomUUID().toString();
-        return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous);
+        return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false)
+        {
+            @Override
+            protected void configureSource(final Source source)
+            {
+                configurator.configureSource(source);
+            }
+        };
 
     }
 
@@ -83,22 +94,10 @@ public class Session
 
     }
 
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-
-        return createSender(targetName, window, mode, null);
-    }
-
     public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName)
             throws Sender.SenderCreationException, ConnectionClosedException
     {
-        return createSender(targetName, window, mode, linkName, null);
-    }
-    public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled)
-            throws Sender.SenderCreationException, ConnectionClosedException
-    {
-        return createSender(targetName, window, mode, linkName, false, unsettled);
+        return createSender(targetName, window, mode, linkName, false, null);
     }
 
     public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName,
@@ -381,4 +380,9 @@ public class Session
             }
         }
     }
+
+    public static interface SourceConfigurator
+    {
+        public void configureSource(final Source source);
+    }
 }

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java Mon Sep 30 07:29:51 2013
@@ -152,7 +152,7 @@ public class Source
         return _outcomes;
     }
 
-    public void setOutcomes(Symbol[] outcomes)
+    public void setOutcomes(Symbol... outcomes)
     {
         _outcomes = outcomes;
     }

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java Mon Sep 30 07:29:51 2013
@@ -34,9 +34,10 @@ import java.util.List;
 
 public class AcceptedConstructor extends DescribedTypeConstructor<Accepted>
 {
+    public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:accepted:list");
     private static final Object[] DESCRIPTORS =
     {
-            Symbol.valueOf("amqp:accepted:list"),UnsignedLong.valueOf(0x0000000000000024L),
+            SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000024L),
     };
 
     private static final AcceptedConstructor INSTANCE = new AcceptedConstructor();

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java?rev=1527467&r1=1527466&r2=1527467&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java Mon Sep 30 07:29:51 2013
@@ -34,9 +34,10 @@ import java.util.List;
 
 public class RejectedConstructor extends DescribedTypeConstructor<Rejected>
 {
+    public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:rejected:list");
     private static final Object[] DESCRIPTORS =
     {
-            Symbol.valueOf("amqp:rejected:list"),UnsignedLong.valueOf(0x0000000000000025L),
+            SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000025L),
     };
 
     private static final RejectedConstructor INSTANCE = new RejectedConstructor();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org