You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 13:13:28 UTC

svn commit: r496666 [3/3] - in /incubator/qpid/branches/perftesting/qpid: java/broker/ java/broker/src/main/grammar/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/filter/ java/broker/src/main/java/or...

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java Tue Jan 16 04:13:19 2007
@@ -21,12 +21,12 @@
 package org.apache.qpid.server.queue;
 
 import java.util.List;
+import java.util.LinkedList;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Distributes messages among a list of subsscription managers, using their
  * weighting.
- *
  */
 class NestedSubscriptionManager implements SubscriptionManager
 {
@@ -44,11 +44,24 @@
         _subscribers.remove(s);
     }
 
+
+    public List<Subscription> getSubscriptions()
+    {
+        List<Subscription> allSubs = new LinkedList<Subscription>();
+
+        for (WeightedSubscriptionManager subMans : _subscribers)
+        {
+            allSubs.addAll(subMans.getSubscriptions());
+        }
+
+        return allSubs;
+    }
+
     public boolean hasActiveSubscribers()
     {
-        for(WeightedSubscriptionManager s : _subscribers)
+        for (WeightedSubscriptionManager s : _subscribers)
         {
-            if(s.hasActiveSubscribers())
+            if (s.hasActiveSubscribers())
             {
                 return true;
             }
@@ -59,9 +72,9 @@
     public Subscription nextSubscriber(AMQMessage msg)
     {
         WeightedSubscriptionManager start = current();
-        for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+        for (WeightedSubscriptionManager s = start; s != null; s = next(start))
         {
-            if(hasMore(s))
+            if (hasMore(s))
             {
                 return nextSubscriber(s);
             }
@@ -94,7 +107,7 @@
     private WeightedSubscriptionManager next()
     {
         _iterations = 0;
-        if(++_index >= _subscribers.size())
+        if (++_index >= _subscribers.size())
         {
             _index = 0;
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Tue Jan 16 04:13:19 2007
@@ -25,6 +25,9 @@
 import org.apache.qpid.server.cluster.SimpleSendable;
 import org.apache.qpid.AMQException;
 
+import java.util.Queue;
+import java.util.List;
+
 class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
 {
     private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@
         return _count;
     }
 
+    public List<Subscription> getSubscriptions()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public boolean hasActiveSubscribers()
     {
         return getWeight() == 0;
@@ -88,9 +96,49 @@
 
     public void queueDeleted(AMQQueue queue)
     {
-        if(queue instanceof ClusteredQueue)
+        if (queue instanceof ClusteredQueue)
         {
             ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
         }
+    }
+
+    public boolean hasFilters()
+    {
+        return false;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return true;
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return null;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+    }
+
+    public boolean isAutoClose()
+    {
+        return false;
+    }
+
+    public void close()
+    {
+        //no-op
+    }
+
+    public boolean isBrowser()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void sendNextMessage(AMQQueue queue)
+    {
+
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java Tue Jan 16 04:13:19 2007
@@ -129,7 +129,7 @@
         }
         catch (Exception e)
         {
-            _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat, e);
+            _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat);
             throw new IllegalArgumentException("Unable to decode PropertyFieldTable format:" + textFormat);
         }
     }
@@ -483,7 +483,7 @@
     {
         return _properties.containsKey(name) && (_properties.get(name) == null) &&
                _propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX);
-                
+
 
     }
 
@@ -606,7 +606,8 @@
         // AMQ start character
         if (!(Character.isLetter(propertyName.charAt(0))
               || propertyName.charAt(0) == '$'
-              || propertyName.charAt(0) == '#'))
+              || propertyName.charAt(0) == '#'
+              || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS.
         {
             throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character");
         }
@@ -1156,9 +1157,9 @@
             if (type == null)
             {
                 String msg = "Field '" + key + "' - unsupported field table type: " + type + ".";
-                    //some extra trace information...
-                    msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
-                    throw new AMQFrameDecodingException(msg);
+                //some extra trace information...
+                msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining;
+                throw new AMQFrameDecodingException(msg);
             }
             Object value;
 
@@ -1203,7 +1204,7 @@
                     value = EncodingUtils.readBytes(buffer);
                     break;
                 default:
-                    String msg = "Internal error, the following type identifier is not handled: " + type;                                        
+                    String msg = "Internal error, the following type identifier is not handled: " + type;
                     throw new AMQFrameDecodingException(msg);
             }
 

Modified: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Tue Jan 16 04:13:19 2007
@@ -55,7 +55,7 @@
     {
         return _name;
     }
-    
+
     public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
 
     public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true);
@@ -73,6 +73,8 @@
     public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
 
     public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
+
+    public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true);
 
     public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
 

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Jan 16 04:13:19 2007
@@ -43,8 +43,6 @@
         TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear");
         TestQueue q7 = bindDefault("F0000", "F0001=Bear");
         TestQueue q8 = bindDefault("F0000=Aardvark", "F0001");
-        TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana");
-        TestQueue q10 = bindDefault("F0000=Apple", "F0001");
 
         routeAndTest(new Message("Message1", "F0000"), q1);
         routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2);
@@ -74,7 +72,6 @@
         TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any");
         TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any");
         TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any");
-        TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any");
         TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any");
 
         routeAndTest(new Message("Message1", "F0000"), q1, q3);
@@ -87,16 +84,14 @@
 
     public void testMandatory() throws AMQException
     {
-        TestQueue q1 = bindDefault("F0000");
+        bindDefault("F0000");
         Message m1 = new Message("Message1", "XXXXX");
         Message m2 = new Message("Message2", "F0000");
         BasicPublishBody pb1 = m1.getPublishBody();
         pb1.mandatory = true;
-        BasicPublishBody pb2 = m1.getPublishBody();
+        BasicPublishBody pb2 = m2.getPublishBody();
         pb2.mandatory = true;
         routeAndTest(m1,true);
-
-
     }
 
     public static junit.framework.Test suite()

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Tue Jan 16 04:13:19 2007
@@ -5,8 +5,8 @@
 import org.apache.qpid.test.VMBrokerSetup;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
 import org.apache.qpid.client.*;
+import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -39,12 +39,14 @@
     protected void setUp() throws Exception
     {
         super.setUp();
+        TransportConnection.createVMBroker(1);
         ApplicationRegistry.initialise(new TestApplicationRegistry(), 1);
     }
 
     protected void tearDown() throws Exception
     {
         super.tearDown();
+        TransportConnection.killAllVMBrokers();
     }
 
     /**
@@ -57,17 +59,14 @@
         _bouncedMessageList.clear();
         Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
 
-        TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore();
 
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
-
-        AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'"));
+        AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'"));
         FieldTable ft = new PropertyFieldTable();
-        ft.setString("F1000","1");
-        MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft);
+        ft.setString("F1000", "1");
+        MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
 
-        
         //force synch to ensure the consumer has resulted in a bound queue
         ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
 
@@ -81,49 +80,45 @@
         con2.start();
 
 
-        MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false);
+        MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false);
         MessageProducer mandatoryProducer = producerSession.createProducer(queue);
 
-
         // First test - should neither be bounced nor routed
         _logger.info("Sending non-routable non-mandatory message");
-        TextMessage msg1 =  producerSession.createTextMessage("msg1");
+        TextMessage msg1 = producerSession.createTextMessage("msg1");
         nonMandatoryProducer.send(msg1);
 
         // Second test - should be bounced
         _logger.info("Sending non-routable mandatory message");
-        TextMessage msg2 =  producerSession.createTextMessage("msg2");
+        TextMessage msg2 = producerSession.createTextMessage("msg2");
         mandatoryProducer.send(msg2);
 
         // Third test - should be routed
         _logger.info("Sending routable message");
-        TextMessage msg3 =  producerSession.createTextMessage("msg3");
-        msg3.setStringProperty("F1000","1");
+        TextMessage msg3 = producerSession.createTextMessage("msg3");
+        msg3.setStringProperty("F1000", "1");
         mandatoryProducer.send(msg3);
 
 
-
         _logger.info("Starting consumer connection");
         con.start();
         TextMessage tm = (TextMessage) consumer.receive(1000L);
 
-        assertTrue("No message routed to receiver",tm != null);
-        assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText()));
+        assertTrue("No message routed to receiver", tm != null);
+        assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText()));
 
         try
         {
             Thread.sleep(1000L);
         }
-        catch(InterruptedException e)
+        catch (InterruptedException e)
         {
             ;
         }
 
-        assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1);
+        assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
         Message m = _bouncedMessageList.get(0);
-        assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2"));
-
-
+        assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
 
 
         con.close();
@@ -134,18 +129,23 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class));
+        return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);
     }
 
     public void onException(JMSException jmsException)
     {
-        _logger.warn("Caught exception on producer: ",jmsException);
+
         Exception linkedException = jmsException.getLinkedException();
-        if(linkedException instanceof AMQNoRouteException)
+        if (linkedException instanceof AMQNoRouteException)
         {
             AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
             Message bounced = (Message) noRoute.getUndeliveredMessage();
             _bouncedMessageList.add(bounced);
+            _logger.info("Caught expected NoRouteException");
+        }
+        else
+        {
+            _logger.warn("Caught exception on producer: ", jmsException);
         }
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java Tue Jan 16 04:13:19 2007
@@ -25,6 +25,7 @@
 import org.apache.mina.common.support.DefaultWriteFuture;
 
 import java.net.SocketAddress;
+import java.net.InetSocketAddress;
 import java.util.Set;
 
 public class MockIoSession implements IoSession
@@ -151,7 +152,7 @@
 
     public SocketAddress getRemoteAddress()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return new InetSocketAddress("127.0.0.1", 1234);  //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public SocketAddress getLocalAddress()

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
@@ -119,6 +120,15 @@
     }
 
     public void setSaslServer(SaslServer saslServer)
+    {
+    }
+
+    public FieldTable getClientProperties()
+    {
+        return null;
+    }
+
+    public void setClientProperties(FieldTable clientProperties)
     {
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue Jan 16 04:13:19 2007
@@ -22,6 +22,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Queue;
 
 public class SubscriptionTestHelper implements Subscription
 {
@@ -68,6 +69,41 @@
 
     public void queueDeleted(AMQQueue queue)
     {
+    }
+
+    public boolean hasFilters()
+    {
+        return false;
+    }
+
+    public boolean hasInterest(AMQMessage msg)
+    {
+        return true;
+    }
+
+    public Queue<AMQMessage> getPreDeliveryQueue()
+    {
+        return null;
+    }
+
+    public void enqueueForPreDelivery(AMQMessage msg)
+    {
+        //no-op
+    }
+
+    public boolean isAutoClose()
+    {
+        return false;
+    }
+
+    public void close()
+    {
+        //no-op
+    }
+
+    public boolean isBrowser()
+    {
+        return false;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
     public int hashCode()

Modified: incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml?view=diff&rev=496666&r1=496665&r2=496666
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml Tue Jan 16 04:13:19 2007
@@ -2105,6 +2105,14 @@
     method it will raise a channel or connection exception.
     </doc>
   </field>
+
+    <field name="arguments" type="table" label="arguments for consuming">
+  <doc>
+    A set of arguments for the consume. The syntax and semantics
+    of these arguments depends on the server implementation.  This
+    field is ignored if passive is 1.
+  </doc>
+    </field>
 </method>
 
 <method name = "consume-ok" synchronous = "1" index = "21">