You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/30 16:37:24 UTC

svn commit: r533764 - in /incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client: AMQSession.java QueueSenderAdapter.java

Author: ritchiem
Date: Mon Apr 30 07:37:23 2007
New Revision: 533764

URL: http://svn.apache.org/viewvc?view=rev&rev=533764
Log:
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client. 

Updated to allow the use of durable subscriptions but it will not be as clean as with the extensions.
Selectors are also now disabled.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533764&r1=533763&r2=533764
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Apr 30 07:37:23 2007
@@ -209,6 +209,12 @@
 
     private final boolean _strictAMQP;
 
+    /** System property to enable strickt AMQP compliance */
+    public static final String STRICT_AMQP_FATAL = "STRICT_AMQP_FATAL";
+    /** Strickt AMQP default */
+    public static final String STRICT_AMQP_FATAL_DEFAULT = "true";
+
+    private final boolean _strictAMQPFATAL;
 
     /** System property to enable immediate message prefetching */
     public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
@@ -436,6 +442,7 @@
     {
 
         _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+        _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT));
         _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
 
         _connection = con;
@@ -924,7 +931,7 @@
                                                                                             getProtocolMajorVersion(),
                                                                                             getProtocolMinorVersion(),
                                                                                             false));    // requeue
-                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");                
+                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
             }
             else
             {
@@ -1212,13 +1219,30 @@
                                                  final int prefetchLow,
                                                  final boolean noLocal,
                                                  final boolean exclusive,
-                                                 final String selector,
+                                                 String selector,
                                                  final FieldTable rawSelector,
                                                  final boolean noConsume,
                                                  final boolean autoClose) throws JMSException
     {
         checkTemporaryDestination(destination);
 
+        final String messageSelector;
+
+        if (_strictAMQP && !(selector == null || selector.equals("")))
+        {
+            if (_strictAMQPFATAL)
+            {
+                throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
+            }
+            else
+            {
+                messageSelector = null;
+            }
+        }
+        else
+        {
+            messageSelector = selector;
+        }
 
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
@@ -1229,6 +1253,7 @@
                 AMQDestination amqd = (AMQDestination) destination;
 
                 final AMQProtocolHandler protocolHandler = getProtocolHandler();
+                // TODO: Define selectors in AMQP
                 // TODO: construct the rawSelector from the selector string if rawSelector == null
                 final FieldTable ft = FieldTableFactory.newFieldTable();
                 //if (rawSelector != null)
@@ -1237,7 +1262,8 @@
                 {
                     ft.addAll(rawSelector);
                 }
-                BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
+
+                BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
                                                                          _messageFactoryRegistry, AMQSession.this,
                                                                          protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
                                                                          _acknowledgeMode, noConsume, autoClose);
@@ -1630,6 +1656,8 @@
 
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
     {
+
+
         checkNotClosed();
         AMQTopic origTopic = checkValidTopic(topic);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
@@ -1657,13 +1685,31 @@
             {
                 topicName = new AMQShortString(topic.getTopicName());
             }
-            // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
-            // says we must trash the subscription.
-            if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
-                !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+
+            if (_strictAMQP)
             {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already exists for '" + topicName + "' "
+                                 + "for creation durableSubscriber. Requesting queue deletion regardless.");
+                }
+
                 deleteQueue(dest.getAMQQueueName());
             }
+            else
+            {
+                // if the queue is bound to the exchange but NOT for this topic, then the JMS spec
+                // says we must trash the subscription.
+                if (isQueueBound(dest.getExchangeName(), dest.getAMQQueueName()) &&
+                    !isQueueBound(dest.getExchangeName(), dest.getAMQQueueName(), topicName))
+                {
+                    deleteQueue(dest.getAMQQueueName());
+                }
+            }
         }
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
@@ -1761,13 +1807,31 @@
         }
         else
         {
-            if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
+            if (_strictAMQP)
             {
+                if (_strictAMQPFATAL)
+                {
+                    throw new UnsupportedOperationException("JMS Durable not currently supported by AMQP.");
+                }
+                else
+                {
+                    _logger.warn("Unable to determine if subscription already exists for '" + name + "' for unsubscribe."
+                                 + " Requesting queue deletion regardless.");
+                }
+
                 deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             }
             else
             {
-                throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+
+                if (isQueueBound(getDefaultTopicExchangeName(), AMQTopic.getDurableTopicQueueName(name, _connection)))
+                {
+                    deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Unknown subscription exchange:" + name);
+                }
             }
         }
     }
@@ -1779,10 +1843,6 @@
 
     boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
     {
-        if (isStrictAMQP())
-        {
-            throw new UnsupportedOperationException();
-        }
 
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=533764&r1=533763&r2=533764
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Mon Apr 30 07:37:23 2007
@@ -9,119 +9,141 @@
 import javax.jms.QueueSender;
 import javax.jms.InvalidDestinationException;
 
-public class QueueSenderAdapter implements QueueSender {
+public class QueueSenderAdapter implements QueueSender
+{
 
-	private BasicMessageProducer _delegate;
-	private Queue _queue;
-	private boolean closed = false;
-	
-	public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue){
-		_delegate = msgProducer;
-		_queue = queue;
-	}
-	
-	public Queue getQueue() throws JMSException {
-		checkPreConditions();
-		return _queue;
-	}
-
-	public void send(Message msg) throws JMSException {
-		checkPreConditions();
-		_delegate.send(msg);
-	}
-
-	public void send(Queue queue, Message msg) throws JMSException {
-		checkPreConditions(queue);
-		_delegate.send(queue, msg);
-	}
-
-	public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
-	throws JMSException {
-		checkPreConditions();
-		_delegate.send(msg, deliveryMode,priority,timeToLive);
-	}
-
-	public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive)
-			throws JMSException {
-		checkPreConditions(queue);
-		_delegate.send(queue,msg, deliveryMode,priority,timeToLive);
-	}
-	
-	public void close() throws JMSException {
-		_delegate.close();
-		closed = true;
-	}
-
-	public int getDeliveryMode() throws JMSException {
-		checkPreConditions();
-		return _delegate.getDeliveryMode();
-	}
-
-	public Destination getDestination() throws JMSException {
-		checkPreConditions();
-		return _delegate.getDestination();
-	}
-
-	public boolean getDisableMessageID() throws JMSException {
-		checkPreConditions();
-		return _delegate.getDisableMessageID();
-	}
-
-	public boolean getDisableMessageTimestamp() throws JMSException {
-		checkPreConditions();
-		return _delegate.getDisableMessageTimestamp();
-	}
-
-	public int getPriority() throws JMSException {
-		checkPreConditions();
-		return _delegate.getPriority();
-	}
-
-	public long getTimeToLive() throws JMSException {
-		checkPreConditions();
-		return _delegate.getTimeToLive();
-	}
-
-	public void send(Destination dest, Message msg) throws JMSException {
-		checkPreConditions((Queue)dest);
-		_delegate.send(dest,msg);
-	}
-
-	public void send(Message msg, int deliveryMode, int priority, long timeToLive)
-			throws JMSException {
-		checkPreConditions();
-		_delegate.send(msg, deliveryMode,priority,timeToLive);
-	}
-
-	public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
-		checkPreConditions((Queue)dest);
-		_delegate.send(dest,msg, deliveryMode,priority,timeToLive);
-	}
-
-	public void setDeliveryMode(int deliveryMode) throws JMSException {
-		checkPreConditions();
-		_delegate.setDeliveryMode(deliveryMode);
-	}
-
-	public void setDisableMessageID(boolean disableMessageID) throws JMSException {
-		checkPreConditions();
-		_delegate.setDisableMessageID(disableMessageID);
-	}
-
-	public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
-		checkPreConditions();
-		_delegate.setDisableMessageTimestamp(disableMessageTimestamp);
-	}
-
-	public void setPriority(int priority) throws JMSException {
-		checkPreConditions();
-		_delegate.setPriority(priority);
-	}
-
-	public void setTimeToLive(long timeToLive) throws JMSException {
-		checkPreConditions();
-		_delegate.setTimeToLive(timeToLive);
-	}
+    private BasicMessageProducer _delegate;
+    private Queue _queue;
+    private boolean closed = false;
+
+    public QueueSenderAdapter(BasicMessageProducer msgProducer, Queue queue)
+    {
+        _delegate = msgProducer;
+        _queue = queue;
+    }
+
+    public Queue getQueue() throws JMSException
+    {
+        checkPreConditions();
+        return _queue;
+    }
+
+    public void send(Message msg) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg);
+    }
+
+    public void send(Queue queue, Message msg) throws JMSException
+    {
+        checkPreConditions(queue);
+        _delegate.send(queue, msg);
+    }
+
+    public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
+            throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Queue queue, Message msg, int deliveryMode, int priority, long timeToLive)
+            throws JMSException
+    {
+        checkPreConditions(queue);
+        _delegate.send(queue, msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void close() throws JMSException
+    {
+        _delegate.close();
+        closed = true;
+    }
+
+    public int getDeliveryMode() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDeliveryMode();
+    }
+
+    public Destination getDestination() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDestination();
+    }
+
+    public boolean getDisableMessageID() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDisableMessageID();
+    }
+
+    public boolean getDisableMessageTimestamp() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getDisableMessageTimestamp();
+    }
+
+    public int getPriority() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getPriority();
+    }
+
+    public long getTimeToLive() throws JMSException
+    {
+        checkPreConditions();
+        return _delegate.getTimeToLive();
+    }
+
+    public void send(Destination dest, Message msg) throws JMSException
+    {
+        checkPreConditions((Queue) dest);
+        _delegate.send(dest, msg);
+    }
+
+    public void send(Message msg, int deliveryMode, int priority, long timeToLive)
+            throws JMSException
+    {
+        checkPreConditions();
+        _delegate.send(msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException
+    {
+        checkPreConditions((Queue) dest);
+        _delegate.send(dest, msg, deliveryMode, priority, timeToLive);
+    }
+
+    public void setDeliveryMode(int deliveryMode) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDeliveryMode(deliveryMode);
+    }
+
+    public void setDisableMessageID(boolean disableMessageID) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDisableMessageID(disableMessageID);
+    }
+
+    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+    }
+
+    public void setPriority(int priority) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setPriority(priority);
+    }
+
+    public void setTimeToLive(long timeToLive) throws JMSException
+    {
+        checkPreConditions();
+        _delegate.setTimeToLive(timeToLive);
+    }
 
     private void checkPreConditions() throws JMSException
     {
@@ -130,31 +152,41 @@
 
     private void checkPreConditions(Queue queue) throws JMSException
     {
-		if (closed){
-			throw new javax.jms.IllegalStateException("Publisher is closed");
-		}
-		
-		AMQSession session = ((BasicMessageProducer) _delegate).getSession();
-		
-		if(session == null || session.isClosed()){
-			throw new javax.jms.IllegalStateException("Invalid Session");
-		}
+        if (closed)
+        {
+            throw new javax.jms.IllegalStateException("Publisher is closed");
+        }
+
+        AMQSession session = ((BasicMessageProducer) _delegate).getSession();
+
+        if (session == null || session.isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Invalid Session");
+        }
 
-        if(!(queue instanceof AMQDestination))
+        if (!(queue instanceof AMQDestination))
         {
             throw new InvalidDestinationException("Queue: " + queue + " is not a valid Qpid queue");
         }
         AMQDestination destination = (AMQDestination) queue;
-        if(!destination.isValidated() && checkQueueBeforePublish())
+        if (!destination.isValidated() && checkQueueBeforePublish())
         {
 
-            if (_delegate.isBound(destination))
+            if (_delegate.getSession().isStrictAMQP())
             {
+                _delegate._logger.warn("AMQP does not support destination validation before publish, ");
                 destination.setValidated(true);
             }
             else
             {
-                throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server");
+                if (_delegate.isBound(destination))
+                {
+                    destination.setValidated(true);
+                }
+                else
+                {
+                    throw new InvalidDestinationException("Queue: " + queue + " is not a valid destination (no bindings on server");
+                }
             }
         }
     }