You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/02/04 15:16:38 UTC

svn commit: r1442128 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/jms/ client/src/test/java/org/apache/qpid/client/ common/src/main/java/org/apache/qpid/configuration/ systests/src/main/j...

Author: rgodfrey
Date: Mon Feb  4 14:16:37 2013
New Revision: 1442128

URL: http://svn.apache.org/viewvc?rev=1442128&view=rev
Log:
QPID-4312 : [Java Client] add option for verification of queue existence during creation of a MessageProducer

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Feb  4 14:16:37 2013
@@ -183,6 +183,9 @@ public class AMQConnection extends Close
     // new amqp-0-10 list encoded format.
     private boolean _useLegacyStreamMessageFormat;
 
+    // When sending to a Queue destination for the first time, check that the queue is bound
+    private final boolean _validateQueueOnSend;
+
     //used to track the last failover time for
     //Address resolution purposes
     private volatile long _lastFailoverTime = 0;
@@ -310,6 +313,18 @@ public class AMQConnection extends Close
                     true : Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT);
         }
 
+        if(connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND) != null)
+        {
+            _validateQueueOnSend = Boolean.parseBoolean(
+                                connectionURL.getOption(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND));
+        }
+        else
+        {
+            _validateQueueOnSend =
+                Boolean.parseBoolean(System.getProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "false"));
+        }
+
+
         String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10");
         if (_logger.isDebugEnabled())
         {
@@ -1441,7 +1456,7 @@ public class AMQConnection extends Close
     {
         return _delegate.getProtocolVersion();
     }
-    
+
     public String getBrokerUUID()
     {
         if(getProtocolVersion().equals(ProtocolVersion.v0_10))
@@ -1565,4 +1580,9 @@ public class AMQConnection extends Close
     {
         _delegate.setHeartbeatListener(listener);
     }
+
+    public boolean validateQueueOnSend()
+    {
+        return _validateQueueOnSend;
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Feb  4 14:16:37 2013
@@ -584,7 +584,7 @@ public class AMQSession_0_10 extends AMQ
             rk = routingKey.toString();
         }
                 
-        return isQueueBound(exchangeName.toString(),queueName.toString(),rk,null);
+        return isQueueBound(exchangeName == null ? null : exchangeName.toString(),queueName == null ? null : queueName.toString(),rk,null);
     }
     
     public boolean isQueueBound(final String exchangeName, final String queueName, final String bindingKey,Map<String,Object> args)
@@ -1605,4 +1605,4 @@ public class AMQSession_0_10 extends AMQ
             }
         }
     }
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Feb  4 14:16:37 2013
@@ -42,6 +42,8 @@ import org.slf4j.Logger;
 
 public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
+
+
     enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
 
     private final Logger _logger ;
@@ -291,7 +293,6 @@ public abstract class BasicMessageProduc
         checkPreConditions();
         checkInitialDestination();
 
-
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
@@ -455,7 +456,7 @@ public abstract class BasicMessageProduc
                 JMSException ex = new JMSException("Error validating destination");
                 ex.initCause(e);
                 ex.setLinkedException(e);
-                
+
                 throw ex;
             }
             amqDestination.setExchangeExistsChecked(true);
@@ -546,7 +547,7 @@ public abstract class BasicMessageProduc
         }
     }
 
-    private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+    private void checkPreConditions() throws JMSException
     {
         checkNotClosed();
 
@@ -560,15 +561,16 @@ public abstract class BasicMessageProduc
         }
     }
 
-    private void checkInitialDestination()
+    private void checkInitialDestination() throws JMSException
     {
         if (_destination == null)
         {
             throw new UnsupportedOperationException("Destination is null");
         }
+        checkValidQueue();
     }
 
-    private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+    private void checkDestination(Destination suppliedDestination) throws JMSException
     {
         if ((_destination != null) && (suppliedDestination != null))
         {
@@ -576,6 +578,11 @@ public abstract class BasicMessageProduc
                     "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
         }
 
+        if(suppliedDestination instanceof AMQQueue)
+        {
+            AMQQueue destination = (AMQQueue) suppliedDestination;
+            checkValidQueue(destination);
+        }
         if (suppliedDestination == null)
         {
             throw new InvalidDestinationException("Supplied Destination was invalid");
@@ -583,6 +590,42 @@ public abstract class BasicMessageProduc
 
     }
 
+    void checkValidQueue() throws JMSException
+    {
+        if(_destination instanceof AMQQueue)
+        {
+            checkValidQueue(_destination);
+        }
+    }
+    void checkValidQueue(AMQDestination destination) throws JMSException
+    {
+        if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
+        {
+            if (getSession().isStrictAMQP())
+            {
+                getLogger().warn("AMQP does not support destination validation before publish, ");
+                destination.setCheckedForQueueBinding(true);
+            }
+            else
+            {
+                if (isBound(destination))
+                {
+                    destination.setCheckedForQueueBinding(true);
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Queue: " + destination.getName()
+                        + " is not a valid destination (no bindings on server");
+                }
+            }
+        }
+    }
+
+    private boolean validateQueueOnSend()
+    {
+        return _connection.validateQueueOnSend();
+    }
+
     /**
      * The session used to create this producer
      */

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Mon Feb  4 14:16:37 2013
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.jms;
 
-import org.apache.qpid.framing.AMQShortString;
-
 import java.util.List;
+import org.apache.qpid.framing.AMQShortString;
 
 /**
  Connection URL format
@@ -35,7 +34,7 @@ public interface ConnectionURL
     public static final String AMQ_PROTOCOL = "amqp";
     public static final String OPTIONS_SYNC_PERSISTENCE = "sync_persistence";
     public static final String OPTIONS_MAXPREFETCH = "maxprefetch";
-    public static final String OPTIONS_SYNC_ACK = "sync_ack";    
+    public static final String OPTIONS_SYNC_ACK = "sync_ack";
     public static final String OPTIONS_SYNC_PUBLISH = "sync_publish";
     public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format";
     public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format";
@@ -62,9 +61,11 @@ public interface ConnectionURL
     public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
     public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
     public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+    public static final String OPTIONS_VERIFY_QUEUE_ON_SEND = "verifyQueueOnSend";
+
     public static final byte  URL_0_8 = 1;
     public static final byte  URL_0_10 = 2;
-    
+
     String getURL();
 
     String getFailoverMethod();

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Mon Feb  4 14:16:37 2013
@@ -670,7 +670,6 @@ public class AMQSession_0_10Test extends
             if (m instanceof ExchangeBound)
             {
                 ExchangeBoundResult struc = new ExchangeBoundResult();
-                struc.setQueueNotFound(true);
                 result.setValue(struc);
             }
             else if (m instanceof ExchangeQuery)

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Mon Feb  4 14:16:37 2013
@@ -203,6 +203,7 @@ public class ClientProperties
      * producer/consumer creation when using BindingURLs.
      */
     public static final String QPID_DECLARE_EXCHANGES_PROP_NAME = "qpid.declare_exchanges";
+    public static final String VERIFY_QUEUE_ON_SEND = "qpid.verify_queue_on_send";
 
 
     private ClientProperties()

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Mon Feb  4 14:16:37 2013
@@ -1148,7 +1148,7 @@ public class AddressBasedDestinationTest
         MessageConsumer cons = ssn.createConsumer(ssn.createTopic("ADDR:amq.topic/test"));
         MessageProducer prod = ssn.createProducer(null);
         
-        Queue queue = ssn.createQueue("ADDR:amq.topic/test");
+        Topic queue = ssn.createTopic("ADDR:amq.topic/test");
         prod.send(queue,ssn.createTextMessage("A"));
         
         Message msg = cons.receive(1000);

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/InvalidDestinationTest.java Mon Feb  4 14:16:37 2013
@@ -21,16 +21,23 @@
 
 package org.apache.qpid.test.unit.basic;
 
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
+import java.util.Collections;
+import java.util.Map;
+import javax.jms.Connection;
 import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueSender;
 import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class InvalidDestinationTest extends QpidBrokerTestCase
 {
@@ -52,17 +59,22 @@ public class InvalidDestinationTest exte
 
     public void testInvalidDestination() throws Exception
     {
-        Queue invalidDestination = new AMQQueue("amq.direct","unknownQ");
-        AMQQueue validDestination = new AMQQueue("amq.direct","knownQ");
+
         QueueSession queueSession = _connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
 
+        Queue invalidDestination = queueSession.createQueue("unknownQ");
+
+        Queue validDestination = queueSession.createQueue(getTestQueueName());
+
         // This is the only easy way to create and bind a queue from the API :-(
         queueSession.createConsumer(validDestination);
+        QueueSender sender;
+        TextMessage msg= queueSession.createTextMessage("Hello");
 
-        QueueSender sender = queueSession.createSender(invalidDestination);
-        TextMessage msg = queueSession.createTextMessage("Hello");
         try
         {
+            sender = queueSession.createSender(invalidDestination);
+
             sender.send(msg);
             fail("Expected InvalidDestinationException");
         }
@@ -70,10 +82,8 @@ public class InvalidDestinationTest exte
         {
             // pass
         }
-        sender.close();
 
         sender = queueSession.createSender(null);
-        invalidDestination = new AMQQueue("amq.direct","unknownQ");
 
         try
         {
@@ -86,7 +96,6 @@ public class InvalidDestinationTest exte
         }
         sender.send(validDestination,msg);
         sender.close();
-        validDestination = new AMQQueue("amq.direct","knownQ");
         sender = queueSession.createSender(validDestination);
         sender.send(msg);
 
@@ -96,6 +105,71 @@ public class InvalidDestinationTest exte
     }
 
 
+
+    public void testInvalidDestinationOnMessageProducer() throws Exception
+    {
+        setTestSystemProperty(ClientProperties.VERIFY_QUEUE_ON_SEND, "true");
+        final AMQConnection connection = (AMQConnection) getConnection();
+        doInvalidDestinationOnMessageProducer(connection);
+
+    }
+
+
+    public void testInvalidDestinationOnMessageProducerURL() throws Exception
+    {
+        Map<String, String> options = Collections.singletonMap(ConnectionURL.OPTIONS_VERIFY_QUEUE_ON_SEND, "true");
+        doInvalidDestinationOnMessageProducer(getConnectionWithOptions(options));
+
+    }
+
+    private void doInvalidDestinationOnMessageProducer(Connection connection) throws JMSException
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Queue invalidDestination = session.createQueue("unknownQ");
+
+        Queue validDestination = session.createQueue("knownQ");
+
+        // This is the only easy way to create and bind a queue from the API :-(
+        session.createConsumer(validDestination);
+
+        MessageProducer sender;
+        TextMessage msg = session.createTextMessage("Hello");
+        try
+        {
+            sender = session.createProducer(invalidDestination);
+            sender.send(msg);
+            fail("Expected InvalidDestinationException");
+        }
+        catch (InvalidDestinationException ex)
+        {
+            // pass
+        }
+
+
+        sender = session.createProducer(null);
+        invalidDestination = new AMQQueue("amq.direct","unknownQ");
+
+        try
+        {
+            sender.send(invalidDestination,msg);
+            fail("Expected InvalidDestinationException");
+        }
+        catch (InvalidDestinationException ex)
+        {
+            // pass
+        }
+        sender.send(validDestination, msg);
+        sender.close();
+        sender = session.createProducer(validDestination);
+        sender.send(msg);
+
+        Topic topic = session.createTopic("randomTopic");
+        sender = session.createProducer(topic);
+        sender.send(msg);
+    }
+
+
     public static junit.framework.Test suite()
     {
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java?rev=1442128&r1=1442127&r2=1442128&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java Mon Feb  4 14:16:37 2013
@@ -20,18 +20,15 @@ package org.apache.qpid.test.utils;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
-import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -49,13 +46,13 @@ import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
-
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -1131,6 +1128,22 @@ public class QpidBrokerTestCase extends 
         return getConnection(GUEST_USERNAME, GUEST_PASSWORD);
     }
 
+    public Connection getConnectionWithOptions(Map<String, String> options)
+                throws URLSyntaxException, NamingException, JMSException
+    {
+        ConnectionURL curl = new AMQConnectionURL(getConnectionFactory().getConnectionURLString());
+        for(Map.Entry<String,String> entry : options.entrySet())
+        {
+            curl.setOption(entry.getKey(), entry.getValue());
+        }
+        curl = new AMQConnectionURL(curl.toString());
+
+        curl.setUsername(GUEST_USERNAME);
+        curl.setPassword(GUEST_PASSWORD);
+        return getConnection(curl);
+    }
+
+
     public Connection getConnection(ConnectionURL url) throws JMSException
     {
         _logger.info(url.getURL());



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org