You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2009/09/09 15:05:52 UTC

svn commit: r812936 [5/6] - in /qpid/branches/java-network-refactor: ./ qpid/buildtools/buildCreator/ qpid/cpp/ qpid/cpp/bindings/qmf/ qpid/cpp/bindings/qmf/python/ qpid/cpp/bindings/qmf/python/qmf/ qpid/cpp/bindings/qmf/ruby/ qpid/cpp/bindings/qmf/tes...

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java Wed Sep  9 13:05:43 2009
@@ -29,6 +29,7 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.Topic;
+import javax.jms.Message;
 import java.io.IOException;
 import java.util.List;
 
@@ -327,22 +328,45 @@
         int PREFETCH = 15;
 
         //Create new session with small prefetch
-        _session = ((AMQConnection) _connection).createSession(true, Session.AUTO_ACKNOWLEDGE, PREFETCH);
+        _session = ((AMQConnection) _connection).createSession(true, Session.SESSION_TRANSACTED, PREFETCH);
 
         MessageConsumer consumer = _session.createConsumer(_queue);
 
         _connection.start();
 
+        //Start the dispatcher & Unflow the channel.
+        consumer.receiveNoWait();
+
         //Fill the prefetch and two extra so that our receive bellow allows the
-        // subscription to become active then return to a suspended state.
-        sendMessage(_session, _queue, 17);
+        // subscription to become active
+        // Previously we set this to 17 so that it would return to a suspended
+        // state. However, testing has shown that the state change can occur
+        // sufficiently quickly that logging does not occur consistently enough
+        // for testing.
+        int SEND_COUNT = 16;
+        sendMessage(_session, _queue, SEND_COUNT);
         _session.commit();
         // Retreive the first message, and start the flow of messages
-        assertNotNull("First message not retreived", consumer.receive(1000));
+        Message msg = consumer.receive(1000);
+        assertNotNull("First message not retreived", msg);
         _session.commit();
         
-        
-        _connection.close();
+        // Drain the queue to ensure there is time for the ACTIVE log message
+        // Check that we can received all the messages
+        int receivedCount = 0;
+        while (msg != null)
+        {
+            receivedCount++;
+            msg = consumer.receive(1000);
+            _session.commit();
+        }
+
+        //Validate we received all the messages
+        assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount);
+
+        // Fill the queue again to suspend the consumer
+        sendMessage(_session, _queue, SEND_COUNT);
+        _session.commit();
 
         //Validate
         List<String> results = _monitor.findMatches("SUB-1003");
@@ -350,15 +374,13 @@
         try
         {
             // Validation expects three messages.
-            // The first will be logged by the QueueActor as part of the processQueue thread
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED 
-            // The second will be by the connnection as it acknowledges and activates the subscription
-// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State : ACTIVE
-            // The final one can be the subscription suspending as part of the SubFlushRunner or the processQueue thread
-            // As a result validating the actor is more complicated and doesn't add anything. The goal of this test is
-            // to ensure the State is correct not that a particular Actor performs the logging.
-// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State : SUSPENDED
+            // The Actor can be any one of the following depending on the exactly what is going on on the broker.
+            // Ideally we would test that we can get all of them but setting up
+            // the timing to do this in a consistent way is not benefitial.
+            // Ensuring the State is as expected is sufficient.
+// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State :
+// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State :
 
             assertEquals("Result set not expected size:", 3, results.size());
 
@@ -367,19 +389,10 @@
             String log = getLog(results.get(0));
             validateSubscriptionState(log, expectedState);
 
-            // Validate that the logActor is the the queue
-            String actor = fromActor(log);
-            assertTrue("Actor string does not contain expected queue("
-                       + _queue.getQueueName() + ") name." + actor,
-                       actor.contains("qu(" + _queue.getQueueName() + ")"));
-
             // After being suspended the subscription should become active.
             expectedState = "ACTIVE";
             log = getLog(results.get(1));
             validateSubscriptionState(log, expectedState);
-            // Validate we have a connection Actor
-            actor = fromActor(log);
-            assertTrue("The actor is not a connection actor:" + actor, actor.startsWith("con:"));
 
             // Validate that it was re-suspended
             expectedState = "SUSPENDED";
@@ -396,6 +409,10 @@
             }
             throw afe;
         }
+        _connection.close();
+
+        //Ensure the queue is drained before the test ends
+        drainQueue(_queue);
 
     }
 

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Wed Sep  9 13:05:43 2009
@@ -51,7 +51,13 @@
 public class DurableSubscriptionTest extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
-
+    
+    /** Timeout for receive() if we are expecting a message */
+    private static final long POSITIVE_RECEIVE_TIMEOUT = 2000;
+    
+    /** Timeout for receive() if we are not expecting a message */
+    private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+    
     public void testUnsubscribe() throws Exception
     {
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -76,16 +82,18 @@
 
         Message msg;
         _logger.info("Receive message on consumer 1:expecting A");
-        msg = consumer1.receive();
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        _logger.info("Receive message on consumer 1:expecting A");
-        msg = consumer2.receive();
+        _logger.info("Receive message on consumer 2:expecting A");
+        msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         _logger.info("Receive message on consumer 1 :expecting null");
         assertEquals(null, msg);
 
@@ -96,14 +104,15 @@
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive();
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         _logger.info("Receive message on consumer 2 :expecting null");
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         _logger.info("Close connection");
@@ -143,14 +152,16 @@
         producer.send(session1.createTextMessage("A"));
 
         Message msg;
-        msg = consumer1.receive();
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        msg = consumer2.receive();
+        msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("A", ((TextMessage) msg).getText());
-        msg = consumer2.receive(1000);
+        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         consumer2.close();
@@ -220,8 +231,8 @@
         msg = consumer1.receive(500);
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
-        msg = consumer2.receive();
-        assertNotNull(msg);
+        msg = consumer2.receive(POSITIVE_RECEIVE_TIMEOUT);
+        assertNotNull("Message should have been received",msg);
         assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
         msg = consumer2.receive(500);
         assertNull("There should be no more messages for consumption on consumer2.", msg);
@@ -235,10 +246,10 @@
         producer.send(session0.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals("B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
-        msg = consumer1.receive(1000);
+        msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
         // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
@@ -296,7 +307,7 @@
 
     	producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
     	
-    	Message msg = liveSubscriber.receive();
+    	Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
     	assertNotNull ("Message should have been received", msg);
     	assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
     	assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -331,7 +342,7 @@
     	assertNotNull("Subscriber should have been created", liveSubscriber);
     	
     	producer.send(session.createTextMessage("testDurableWithInvalidSelector2"));
-    	Message msg = liveSubscriber.receive();
+    	Message msg = liveSubscriber.receive(POSITIVE_RECEIVE_TIMEOUT);
     	assertNotNull ("Message should have been received", msg);
     	assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
     	assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
@@ -360,13 +371,13 @@
         // Send 1 matching message and 1 non-matching message
         sendMatchingAndNonMatchingMessage(session, producer);
 
-        Message rMsg = subA.receive(1000);
+        Message rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelector1",
                      ((TextMessage) rMsg).getText());
         
-        rMsg = subA.receive(1000);
+        rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull(rMsg);
         
         // Disconnect subscriber
@@ -379,13 +390,13 @@
         
         // Check messages are recieved properly
         sendMatchingAndNonMatchingMessage(session, producer);
-        rMsg = subB.receive(1000);
+        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelector2",
                      ((TextMessage) rMsg).getText());
         
-        rMsg = subB.receive(1000);
+        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertNull(rMsg);
         session.unsubscribe("testResubscribeWithChangedSelector");
     }
@@ -429,5 +440,5 @@
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(DurableSubscriptionTest.class);
-    }
+    }  
 }

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Wed Sep  9 13:05:43 2009
@@ -479,7 +479,7 @@
         _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
         _pubSession.commit();
 
-        assertNotNull(_consumer.receive(100));
+        assertNotNull(_consumer.receive(1000));
 
         _publisher.send(_pubSession.createTextMessage(MESSAGE_TEXT));
 

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java Wed Sep  9 13:05:43 2009
@@ -55,7 +55,7 @@
     {
         super.setUp();
         setSystemProperty("QPID_WORK", System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
-        startBroker(FAILING_PORT);
+        startBroker(failingPort);
     }
 
     /**
@@ -76,7 +76,7 @@
 
     public void tearDown() throws Exception
     {
-    	stopBroker(FAILING_PORT);
+    	stopBroker(_broker.equals(VM)?FAILING_PORT:FAILING_PORT);
         super.tearDown();
         FileUtils.deleteDirectory(System.getProperty("java.io.tmpdir")+"/"+getFailingPort());
     }

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitor.java Wed Sep  9 13:05:43 2009
@@ -31,6 +31,7 @@
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -118,7 +119,7 @@
      * @throws java.io.FileNotFoundException if the Log file can nolonger be found
      * @throws IOException                   thrown when reading the log file
      */
-    public boolean waitForMessage(String message, long wait)
+    public boolean waitForMessage(String message, long wait, boolean printFileOnFailure)
             throws FileNotFoundException, IOException
     {
         // Loop through alerts until we're done or wait ms seconds have passed,
@@ -126,20 +127,35 @@
         BufferedReader reader = new BufferedReader(new FileReader(_logfile));
         boolean found = false;
         long endtime = System.currentTimeMillis() + wait;
+        ArrayList<String> contents = new ArrayList<String>();
         while (!found && System.currentTimeMillis() < endtime)
         {
             while (reader.ready())
             {
                 String line = reader.readLine();
+                contents.add(line);
                 if (line.contains(message))
                 {
                     found = true;
                 }
             }
         }
-
+        if (!found && printFileOnFailure)
+        {
+            for (String line : contents)
+            {
+                System.out.println(line);
+            }
+        }
         return found;
     }
+    
+
+    public boolean waitForMessage(String messageCountAlert, long alertLogWaitPeriod) throws FileNotFoundException, IOException
+    {
+       return waitForMessage(messageCountAlert, alertLogWaitPeriod, true);
+    }
+
 
     /**
      * Read the log file in to memory as a String

Modified: qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java (original)
+++ qpid/branches/java-network-refactor/qpid/java/systests/src/main/java/org/apache/qpid/util/LogMonitorTest.java Wed Sep  9 13:05:43 2009
@@ -30,25 +30,25 @@
 public class LogMonitorTest extends TestCase
 {
 
+    private LogMonitor _monitor;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        _monitor = new LogMonitor();
+        _monitor.getMonitoredFile().deleteOnExit(); // Make sure we clean up
+    }
+    
     /**
      * Test that a new file is created when attempting to set up a monitor with
      * the default constructor.
      */
     public void testMonitor()
     {
-        // Validate that a NPE is thrown with null input
-        try
-        {
-            LogMonitor montior = new LogMonitor();
-            //Validte that the monitor is now running on a new file
-            assertTrue("New file does not have correct name:" + montior.
-                    getMonitoredFile().getName(),
-                       montior.getMonitoredFile().getName().contains("LogMonitor"));
-        }
-        catch (IOException ioe)
-        {
-            fail("IOE thrown:" + ioe);
-        }
+        //Validate that the monitor is now running on a new file
+        assertTrue("New file does not have correct name:" + _monitor.
+                getMonitoredFile().getName(),
+                _monitor.getMonitoredFile().getName().contains("LogMonitor"));
     }
 
     /**
@@ -63,13 +63,11 @@
         File testFile = File.createTempFile("testMonitorFile", ".log");
         testFile.deleteOnExit();
 
-        LogMonitor monitor;
-
         //Ensure that we can create a monitor on a file
         try
         {
-            monitor = new LogMonitor(testFile);
-            assertEquals(testFile, monitor.getMonitoredFile());
+            _monitor = new LogMonitor(testFile);
+            assertEquals(testFile, _monitor.getMonitoredFile());
         }
         catch (IOException ioe)
         {
@@ -136,13 +134,12 @@
      */
     public void testFindMatches_Match() throws IOException
     {
-        LogMonitor monitor = new LogMonitor();
 
         String message = getName() + ": Test Message";
 
         Logger.getRootLogger().warn(message);
 
-        validateLogContainsMessage(monitor, message);
+        validateLogContainsMessage(_monitor, message);
     }
 
     /**
@@ -152,35 +149,17 @@
      */
     public void testFindMatches_NoMatch() throws IOException
     {
-        LogMonitor monitor = new LogMonitor();
-
         String message = getName() + ": Test Message";
 
         Logger.getRootLogger().warn(message);
 
         String notLogged = "This text was not logged";
 
-        validateLogDoesNotContainsMessage(monitor, notLogged);
-    }
-
-    public void testWaitForMessage_Found() throws IOException
-    {
-        LogMonitor monitor = new LogMonitor();
-
-        String message = getName() + ": Test Message";
-
-        long TIME_OUT = 2000;
-
-        logMessageWithDelay(message, TIME_OUT / 2);
-
-        assertTrue("Message was not logged ",
-                   monitor.waitForMessage(message, TIME_OUT));
+        validateLogDoesNotContainsMessage(_monitor, notLogged);
     }
 
     public void testWaitForMessage_Timeout() throws IOException
     {
-        LogMonitor monitor = new LogMonitor();
-
         String message = getName() + ": Test Message";
 
         long TIME_OUT = 2000;
@@ -189,41 +168,37 @@
 
         // Verify that we can time out waiting for a message
         assertFalse("Message was logged ",
-                    monitor.waitForMessage(message, TIME_OUT / 2));
+                    _monitor.waitForMessage(message, TIME_OUT / 2, false));
 
         // Verify that the message did eventually get logged.
         assertTrue("Message was never logged.",
-                   monitor.waitForMessage(message, TIME_OUT));
+                    _monitor.waitForMessage(message, TIME_OUT));
     }
 
     public void testReset() throws IOException
     {
-        LogMonitor monitor = new LogMonitor();
-
         String message = getName() + ": Test Message";
 
         Logger.getRootLogger().warn(message);
 
-        validateLogContainsMessage(monitor, message);
+        validateLogContainsMessage(_monitor, message);
 
         String LOG_RESET_TEXT = "Log Monitor Reset";
 
-        validateLogDoesNotContainsMessage(monitor, LOG_RESET_TEXT);
+        validateLogDoesNotContainsMessage(_monitor, LOG_RESET_TEXT);
 
-        monitor.reset();
+        _monitor.reset();
 
-        assertEquals("", monitor.readFile());
+        assertEquals("", _monitor.readFile());
     }
 
     public void testRead() throws IOException
     {
-        LogMonitor monitor = new LogMonitor();
-
         String message = getName() + ": Test Message";
 
         Logger.getRootLogger().warn(message);
 
-        String fileContents = monitor.readFile();
+        String fileContents = _monitor.readFile();
 
         assertTrue("Logged message not found when reading file.",
                    fileContents.contains(message));

Modified: qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes (original)
+++ qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes Wed Sep  9 13:05:43 2009
@@ -4,6 +4,7 @@
 //These tests are for the java broker
 org.apache.qpid.server.security.acl.SimpleACLTest#*
 org.apache.qpid.server.plugins.PluginTest#*
+org.apache.qpid.server.BrokerStartupTest#*
 
 // This test is not finished
 org.apache.qpid.test.testcases.TTLTest#*
@@ -82,3 +83,9 @@
 
 // CPP Broker does not have a JMX interface to test
 org.apache.qpid.management.jmx.*
+
+// 0-10 is not supported by the MethodRegistry
+org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
+
+// QPID-2084 : this test needs more work for 0-10
+org.apache.qpid.test.unit.client.DynamicQueueExchangeCreateTest#*

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/010Excludes:520691-726139
+/qpid/trunk/qpid/java/test-profiles/010Excludes:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/08Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/08Excludes:520691-726139
+/qpid/trunk/qpid/java/test-profiles/08Excludes:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/08StandaloneExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:520691-726139
+/qpid/trunk/qpid/java/test-profiles/08StandaloneExcludes:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/08TransientExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/08TransientExcludes:520691-726139

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/Excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/Excludes:520691-726139
+/qpid/trunk/qpid/java/test-profiles/Excludes:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/XAExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/XAExcludes:520691-726139

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/clean-dir
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/clean-dir:520691-726139
+/qpid/trunk/qpid/java/test-profiles/clean-dir:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.async.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:520691-726139
+/qpid/trunk/qpid/java/test-profiles/cpp.async.testprofile:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.cluster.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/cpp.cluster.testprofile:520691-726139

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.noprefetch.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:520691-726139
+/qpid/trunk/qpid/java/test-profiles/cpp.noprefetch.testprofile:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.excludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.excludes:520691-726139

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.ssl.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/cpp.ssl.testprofile:520691-726139

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/cpp.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:520691-726139
+/qpid/trunk/qpid/java/test-profiles/cpp.testprofile:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/default.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/default.testprofile:520691-726139
+/qpid/trunk/qpid/java/test-profiles/default.testprofile:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/java-derby.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:520691-726139
+/qpid/trunk/qpid/java/test-profiles/java-derby.testprofile:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/java.testprofile
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/java.testprofile:520691-726139
+/qpid/trunk/qpid/java/test-profiles/java.testprofile:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/log4j-test.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:520691-726139
+/qpid/trunk/qpid/java/test-profiles/log4j-test.xml:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/test-provider.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1,2 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/test-provider.properties:520691-726139
+/qpid/trunk/qpid/java/test-profiles/test-provider.properties:805429-812920

Propchange: qpid/branches/java-network-refactor/qpid/java/test-profiles/test_resources/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+/incubator/qpid/trunk/qpid/java/test-profiles/test_resources:520691-726139

Propchange: qpid/branches/java-network-refactor/qpid/python/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Sep  9 13:05:43 2009
@@ -0,0 +1 @@
+build

Modified: qpid/branches/java-network-refactor/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/commands/qpid-config?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/commands/qpid-config (original)
+++ qpid/branches/java-network-refactor/qpid/python/commands/qpid-config Wed Sep  9 13:05:43 2009
@@ -110,6 +110,12 @@
     print "    --force-if-not-empty  Force delete of queue even if it's not empty"
     print "    --force-if-used       Force delete of queue even if it's currently used"
     print
+    print "Add Exchange <type> values:"
+    print "    direct     Direct exchange for point-to-point communication"
+    print "    fanout     Fanout exchange for broadcast communication"
+    print "    topic      Topic exchange that routes messages using binding keys with wildcards"
+    print "    headers    Headers exchange that matches header fields against the binding keys"
+    print
     print "Add Exchange Options:"
     print "    --alternate-exchange [name of the alternate exchange]"
     print "                         In the event that a message cannot be routed, this is the name of the exchange to"
@@ -190,6 +196,8 @@
                 if ex.durable:    print "--durable",
                 if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
                 if IVE in args and args[IVE] == 1: print "--ive",
+                if ex.altExchange:
+                    print "--alternate-exchange=%s" % ex._altExchange_.name,
                 print
 
     def ExchangeListRecurse (self, filter):

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/compat.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/compat.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/compat.py Wed Sep  9 13:05:43 2009
@@ -26,3 +26,10 @@
   from socket import SHUT_RDWR
 except ImportError:
   SHUT_RDWR = 2
+
+try:
+  from traceback import format_exc
+except ImportError:
+  import sys, traceback
+  def format_exc():
+    return "".join(traceback.format_exception(*sys.exc_info()))

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/datatypes.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/datatypes.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/datatypes.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/datatypes.py Wed Sep  9 13:05:43 2009
@@ -132,7 +132,7 @@
     return hash(self.value)
 
   def __cmp__(self, other):
-    if other is None:
+    if other.__class__ not in (int, long, Serial):
       return 1
 
     other = serial(other)
@@ -150,7 +150,10 @@
     return Serial(self.value + other)
 
   def __sub__(self, other):
-    return Serial(self.value - other)
+    if isinstance(other, Serial):
+      return self.value - other.value
+    else:
+      return Serial(self.value - other)
 
   def __repr__(self):
     return "serial(%s)" % self.value
@@ -169,7 +172,7 @@
 
   def __contains__(self, n):
     return self.lower <= n and n <= self.upper
-    
+
   def __iter__(self):
     i = self.lower
     while i <= self.upper:
@@ -230,7 +233,7 @@
 
   def add(self, lower, upper = None):
     self.add_range(Range(lower, upper))
-    
+
   def __iter__(self):
     return iter(self.ranges)
 

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/messaging.py Wed Sep  9 13:05:43 2009
@@ -6,9 +6,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,63 +30,18 @@
   - protocol negotiation/multiprotocol impl
 """
 
-import connection, time, socket, sys, traceback
 from codec010 import StringCodec
-from datatypes import timestamp, uuid4, RangedSet, Message as Message010
+from concurrency import synchronized, Waiter
+from datatypes import timestamp, uuid4, Serial
 from logging import getLogger
 from ops import PRIMITIVE
-from session import Client, INCOMPLETE
 from threading import Thread, RLock, Condition
-from util import connect
+from util import default
 
 log = getLogger("qpid.messaging")
 
 static = staticmethod
 
-def synchronized(meth):
-  def sync_wrapper(self, *args, **kwargs):
-    self.lock()
-    try:
-      return meth(self, *args, **kwargs)
-    finally:
-      self.unlock()
-  return sync_wrapper
-
-class Lockable(object):
-
-  def lock(self):
-    self._lock.acquire()
-
-  def unlock(self):
-    self._lock.release()
-
-  def wait(self, predicate, timeout=None):
-    passed = 0
-    start = time.time()
-    while not predicate():
-      if timeout is None:
-        # using the timed wait prevents keyboard interrupts from being
-        # blocked while waiting
-        self._condition.wait(3)
-      elif passed < timeout:
-        self._condition.wait(timeout - passed)
-      else:
-        return False
-      passed = time.time() - start
-    return True
-
-  def notify(self):
-    self._condition.notify()
-
-  def notifyAll(self):
-    self._condition.notifyAll()
-
-def default(value, default):
-  if value is None:
-    return default
-  else:
-    return value
-
 AMQP_PORT = 5672
 AMQPS_PORT = 5671
 
@@ -101,10 +56,20 @@
 
 UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
 
-class ConnectError(Exception):
+class ConnectionError(Exception):
+  """
+  The base class for all connection related exceptions.
+  """
+  pass
+
+class ConnectError(ConnectionError):
+  """
+  Exception raised when there is an error connecting to the remote
+  peer.
+  """
   pass
 
-class Connection(Lockable):
+class Connection:
 
   """
   A Connection manages a group of L{Sessions<Session>} and connects
@@ -142,12 +107,35 @@
     self.host = host
     self.port = default(port, AMQP_PORT)
     self.started = False
-    self._conn = None
     self.id = str(uuid4())
     self.session_counter = 0
     self.sessions = {}
+    self.reconnect = False
+    self._connected = False
     self._lock = RLock()
     self._condition = Condition(self._lock)
+    self._waiter = Waiter(self._condition)
+    self._modcount = Serial(0)
+    self.error = None
+    from driver import Driver
+    self._driver = Driver(self)
+    self._driver.start()
+
+  def _wait(self, predicate, timeout=None):
+    return self._waiter.wait(predicate, timeout=timeout)
+
+  def _wakeup(self):
+    self._modcount += 1
+    self._driver.wakeup()
+
+  def _check_error(self, exc=ConnectionError):
+    if self.error:
+      raise exc(*self.error)
+
+  def _ewait(self, predicate, timeout=None, exc=ConnectionError):
+    result = self._wait(lambda: self.error or predicate(), timeout)
+    self._check_error(exc)
+    return result
 
   @synchronized
   def session(self, name=None, transactional=False):
@@ -173,8 +161,7 @@
     else:
       ssn = Session(self, name, self.started, transactional=transactional)
       self.sessions[name] = ssn
-      if self._conn is not None:
-        ssn._attach()
+      self._wakeup()
       return ssn
 
   @synchronized
@@ -186,38 +173,25 @@
     """
     Connect to the remote endpoint.
     """
-    if self._conn is not None:
-      return
-    try:
-      self._socket = connect(self.host, self.port)
-    except socket.error, e:
-      raise ConnectError(e)
-    self._conn = connection.Connection(self._socket)
-    try:
-      self._conn.start()
-    except connection.VersionError, e:
-      raise ConnectError(e)
-
-    for ssn in self.sessions.values():
-      ssn._attach()
+    self._connected = True
+    self._wakeup()
+    self._ewait(lambda: self._driver._connected, exc=ConnectError)
 
   @synchronized
   def disconnect(self):
     """
     Disconnect from the remote endpoint.
     """
-    if self._conn is not None:
-      self._conn.close()
-      self._conn = None
-    for ssn in self.sessions.values():
-      ssn._disconnected()
+    self._connected = False
+    self._wakeup()
+    self._ewait(lambda: not self._driver._connected)
 
   @synchronized
   def connected(self):
     """
     Return true if the connection is connected, false otherwise.
     """
-    return self._conn is not None
+    return self._connected
 
   @synchronized
   def start(self):
@@ -255,22 +229,32 @@
   def __init__(self, value):
     self.value = value
 
+  # XXX: this should become part of the driver
   def _bind(self, ssn, exchange, queue):
     ssn.exchange_bind(exchange=exchange, queue=queue,
                       binding_key=self.value.replace("*", "#"))
 
-FILTER_DEFAULTS = {
-  "topic": Pattern("*")
-  }
+class SessionError(Exception):
+  pass
 
-def delegate(session):
-  class Delegate(Client):
+class Disconnected(SessionError):
+  """
+  Exception raised when an operation is attempted that is illegal when
+  disconnected.
+  """
+  pass
 
-    def message_transfer(self, cmd):
-      session._message_transfer(cmd)
-  return Delegate
+class NontransactionalSession(SessionError):
+  """
+  Exception raised when commit or rollback is attempted on a non
+  transactional session.
+  """
+  pass
 
-class Session(Lockable):
+class TransactionAborted(SessionError):
+  pass
+
+class Session:
 
   """
   Sessions provide a linear context for sending and receiving
@@ -281,18 +265,28 @@
     self.connection = connection
     self.name = name
     self.started = started
+
     self.transactional = transactional
-    self._ssn = None
+
+    self.committing = False
+    self.committed = True
+    self.aborting = False
+    self.aborted = False
+
     self.senders = []
     self.receivers = []
-    self.closing = False
+    self.outgoing = []
     self.incoming = []
-    self.closed = False
     self.unacked = []
-    if self.transactional:
-      self.acked = []
-    self._lock = RLock()
-    self._condition = Condition(self._lock)
+    self.acked = []
+    # XXX: I hate this name.
+    self.ack_capacity = UNLIMITED
+
+    self.closing = False
+    self.closed = False
+
+    self._lock = connection._lock
+    self.running = True
     self.thread = Thread(target = self.run)
     self.thread.setDaemon(True)
     self.thread.start()
@@ -300,60 +294,17 @@
   def __repr__(self):
     return "<Session %s>" % self.name
 
-  def _attach(self):
-    self._ssn = self.connection._conn.session(self.name, delegate=delegate(self))
-    self._ssn.auto_sync = False
-    self._ssn.invoke_lock = self._lock
-    self._ssn.lock = self._lock
-    self._ssn.condition = self._condition
-    if self.transactional:
-      self._ssn.tx_select()
-    for link in self.senders + self.receivers:
-      link._link()
-
-  def _disconnected(self):
-    self._ssn = None
-    for link in self.senders + self.receivers:
-      link._disconnected()
-
-  @synchronized
-  def _message_transfer(self, cmd):
-    m = Message010(cmd.payload)
-    m.headers = cmd.headers
-    m.id = cmd.id
-    msg = self._decode(m)
-    rcv = self.receivers[int(cmd.destination)]
-    msg._receiver = rcv
-    log.debug("RECV [%s] %s", self, msg)
-    self.incoming.append(msg)
-    self.notifyAll()
-    return INCOMPLETE
-
-  def _decode(self, message):
-    dp = message.get("delivery_properties")
-    mp = message.get("message_properties")
-    ap = mp.application_headers
-    enc, dec = get_codec(mp.content_type)
-    content = dec(message.body)
-    msg = Message(content)
-    msg.id = mp.message_id
-    if ap is not None:
-      msg.to = ap.get("to")
-      msg.subject = ap.get("subject")
-    msg.user_id = mp.user_id
-    if mp.reply_to is not None:
-      msg.reply_to = reply_to2addr(mp.reply_to)
-    msg.correlation_id = mp.correlation_id
-    msg.properties = mp.application_headers
-    msg.content_type = mp.content_type
-    msg._transfer_id = message.id
-    return msg
+  def _wait(self, predicate, timeout=None):
+    return self.connection._wait(predicate, timeout=timeout)
+
+  def _wakeup(self):
+    self.connection._wakeup()
 
-  def _exchange_query(self, address):
-    # XXX: auto sync hack is to avoid deadlock on future
-    result = self._ssn.exchange_query(name=address, sync=True)
-    self._ssn.sync()
-    return result.get()
+  def _check_error(self, exc=SessionError):
+    self.connection._check_error(exc)
+
+  def _ewait(self, predicate, timeout=None, exc=SessionError):
+    return self.connection._ewait(predicate, timeout, exc)
 
   @synchronized
   def sender(self, target):
@@ -368,8 +319,11 @@
     """
     sender = Sender(self, len(self.senders), target)
     self.senders.append(sender)
-    if self._ssn is not None:
-      sender._link()
+    self._wakeup()
+    # XXX: because of the lack of waiting here we can end up getting
+    # into the driver loop with messages sent for senders that haven't
+    # been linked yet, something similar can probably happen for
+    # receivers
     return sender
 
   @synchronized
@@ -386,8 +340,7 @@
     receiver = Receiver(self, len(self.receivers), source, filter,
                         self.started)
     self.receivers.append(receiver)
-    if self._ssn is not None:
-      receiver._link()
+    self._wakeup()
     return receiver
 
   @synchronized
@@ -415,43 +368,45 @@
 
   @synchronized
   def _get(self, predicate, timeout=None):
-    if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing),
-                 timeout):
+    if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing),
+                  timeout):
       msg = self._pop(predicate)
       if msg is not None:
+        msg._receiver.returned += 1
         self.unacked.append(msg)
         log.debug("RETR [%s] %s", self, msg)
         return msg
     return None
 
   @synchronized
-  def acknowledge(self, message=None):
+  def acknowledge(self, message=None, sync=True):
     """
     Acknowledge the given L{Message}. If message is None, then all
     unacknowledged messages on the session are acknowledged.
 
     @type message: Message
     @param message: the message to acknowledge or None
+    @type sync: boolean
+    @param sync: if true then block until the message(s) are acknowledged
     """
     if message is None:
       messages = self.unacked[:]
     else:
       messages = [message]
 
-    ids = RangedSet(*[m._transfer_id for m in messages])
-    for range in ids:
-      self._ssn.receiver._completed.add_range(range)
-    self._ssn.channel.session_completed(self._ssn.receiver._completed)
-    self._ssn.message_accept(ids, sync=True)
-    self._ssn.sync()
-
     for m in messages:
-      try:
-        self.unacked.remove(m)
-      except ValueError:
-        pass
-      if self.transactional:
-        self.acked.append(m)
+      if self.ack_capacity is not UNLIMITED:
+        if self.ack_capacity <= 0:
+          # XXX: this is currently a SendError, maybe it should be a SessionError?
+          raise InsufficientCapacity("ack_capacity = %s" % self.ack_capacity)
+        self._wakeup()
+        self._ewait(lambda: len(self.acked) < self.ack_capacity)
+      self.unacked.remove(m)
+      self.acked.append(m)
+
+    self._wakeup()
+    if sync:
+      self._ewait(lambda: not [m for m in messages if m in self.acked])
 
   @synchronized
   def commit(self):
@@ -461,11 +416,12 @@
     """
     if not self.transactional:
       raise NontransactionalSession()
-    if self._ssn is None:
-      raise Disconnected()
-    self._ssn.tx_commit(sync=True)
-    del self.acked[:]
-    self._ssn.sync()
+    self.committing = True
+    self._wakeup()
+    self._ewait(lambda: not self.committing)
+    if self.aborted:
+      raise TransactionAborted()
+    assert self.committed
 
   @synchronized
   def rollback(self):
@@ -475,21 +431,10 @@
     """
     if not self.transactional:
       raise NontransactionalSession()
-    if self._ssn is None:
-      raise Disconnected()
-
-    ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming])
-    for range in ids:
-      self._ssn.receiver._completed.add_range(range)
-    self._ssn.channel.session_completed(self._ssn.receiver._completed)
-    self._ssn.message_release(ids)
-    self._ssn.tx_rollback(sync=True)
-
-    del self.incoming[:]
-    del self.unacked[:]
-    del self.acked[:]
-
-    self._ssn.sync()
+    self.aborting = True
+    self._wakeup()
+    self._ewait(lambda: not self.aborting)
+    assert self.aborted
 
   @synchronized
   def start(self):
@@ -508,7 +453,7 @@
     for rcv in self.receivers:
       rcv.stop()
     # TODO: think about stopping individual receivers in listen mode
-    self.wait(lambda: self._peek(self._pred) is None)
+    self._wait(lambda: self._peek(self._pred) is None)
     self.started = False
 
   def _pred(self, m):
@@ -516,6 +461,7 @@
 
   @synchronized
   def run(self):
+    self.running = True
     try:
       while True:
         msg = self._get(self._pred)
@@ -524,10 +470,10 @@
         else:
           msg._receiver.listener(msg)
           if self._peek(self._pred) is None:
-            self.notifyAll()
+            self.connection._waiter.notifyAll()
     finally:
-      self.closed = True
-      self.notifyAll()
+      self.running = False
+      self.connection._waiter.notifyAll()
 
   @synchronized
   def close(self):
@@ -538,45 +484,22 @@
       link.close()
 
     self.closing = True
-    self.notifyAll()
-    self.wait(lambda: self.closed)
+    self._wakeup()
+    self._ewait(lambda: self.closed and not self.running)
     while self.thread.isAlive():
       self.thread.join(3)
     self.thread = None
-    self._ssn.close()
-    self._ssn = None
+    # XXX: should be able to express this condition through API calls
+    self._ewait(lambda: not self.outgoing and not self.acked)
     self.connection._remove_session(self)
 
-def parse_addr(address):
-  parts = address.split("/", 1)
-  if len(parts) == 1:
-    return parts[0], None
-  else:
-    return parts[0], parts[i1]
-
-def reply_to2addr(reply_to):
-  if reply_to.routing_key is None:
-    return reply_to.exchange
-  elif reply_to.exchange in (None, ""):
-    return reply_to.routing_key
-  else:
-    return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
-
-class Disconnected(Exception):
-  """
-  Exception raised when an operation is attempted that is illegal when
-  disconnected.
-  """
+class SendError(SessionError):
   pass
 
-class NontransactionalSession(Exception):
-  """
-  Exception raised when commit or rollback is attempted on a non
-  transactional session.
-  """
+class InsufficientCapacity(SendError):
   pass
 
-class Sender(Lockable):
+class Sender:
 
   """
   Sends outgoing messages.
@@ -586,100 +509,99 @@
     self.session = session
     self.index = index
     self.target = target
+    self.capacity = UNLIMITED
+    self.queued = Serial(0)
+    self.acked = Serial(0)
     self.closed = False
-    self._ssn = None
-    self._exchange = None
-    self._routing_key = None
-    self._subject = None
     self._lock = self.session._lock
-    self._condition = self.session._condition
 
-  def _link(self):
-    self._ssn = self.session._ssn
-    node, self._subject = parse_addr(self.target)
-    result = self.session._exchange_query(node)
-    if result.not_found:
-      # XXX: should check 'create' option
-      self._ssn.queue_declare(queue=node, durable=False, sync=True)
-      self._ssn.sync()
-      self._exchange = ""
-      self._routing_key = node
-    else:
-      self._exchange = node
-      self._routing_key = self._subject
+  def _wakeup(self):
+    self.session._wakeup()
 
-  def _disconnected(self):
-    self._ssn = None
+  def _check_error(self, exc=SendError):
+    self.session._check_error(exc)
+
+  def _ewait(self, predicate, timeout=None, exc=SendError):
+    return self.session._ewait(predicate, timeout, exc)
 
   @synchronized
-  def send(self, object):
+  def pending(self):
+    """
+    Returns the number of messages awaiting acknowledgment.
+    @rtype: int
+    @return: the number of unacknowledged messages
+    """
+    return self.queued - self.acked
+
+  @synchronized
+  def send(self, object, sync=True, timeout=None):
     """
     Send a message. If the object passed in is of type L{unicode},
     L{str}, L{list}, or L{dict}, it will automatically be wrapped in a
     L{Message} and sent. If it is of type L{Message}, it will be sent
-    directly.
+    directly. If the sender capacity is not L{UNLIMITED} then send
+    will block until there is available capacity to send the message.
+    If the timeout parameter is specified, then send will throw an
+    L{InsufficientCapacity} exception if capacity does not become
+    available within the specified time.
 
     @type object: unicode, str, list, dict, Message
     @param object: the message or content to send
+
+    @type sync: boolean
+    @param sync: if true then block until the message is sent
+
+    @type timeout: float
+    @param timeout: the time to wait for available capacity
     """
 
-    if self._ssn is None:
+    if not self.session.connection._connected or self.session.closing:
       raise Disconnected()
 
     if isinstance(object, Message):
       message = object
     else:
       message = Message(object)
-    # XXX: what if subject is specified for a normal queue?
-    if self._routing_key is None:
-      rk = message.subject
-    else:
-      rk = self._routing_key
-    # XXX: do we need to query to figure out how to create the reply-to interoperably?
-    if message.reply_to:
-      rt = self._ssn.reply_to(*parse_addr(message.reply_to))
-    else:
-      rt = None
-    dp = self._ssn.delivery_properties(routing_key=rk)
-    mp = self._ssn.message_properties(message_id=message.id,
-                                      user_id=message.user_id,
-                                      reply_to=rt,
-                                      correlation_id=message.correlation_id,
-                                      content_type=message.content_type,
-                                      application_headers=message.properties)
-    if message.subject is not None:
-      if mp.application_headers is None:
-        mp.application_headers = {}
-      mp.application_headers["subject"] = message.subject
-    if message.to is not None:
-      if mp.application_headers is None:
-        mp.application_headers = {}
-      mp.application_headers["to"] = message.to
-    enc, dec = get_codec(message.content_type)
-    body = enc(message.content)
-    self._ssn.message_transfer(destination=self._exchange,
-                               message=Message010(dp, mp, body),
-                               sync=True)
-    log.debug("SENT [%s] %s", self.session, message)
-    self._ssn.sync()
+
+    if self.capacity is not UNLIMITED:
+      if self.capacity <= 0:
+        raise InsufficientCapacity("capacity = %s" % self.capacity)
+      if not self._ewait(lambda: self.pending() < self.capacity, timeout=timeout):
+        raise InsufficientCapacity("capacity = %s" % self.capacity)
+
+    # XXX: what if we send the same message to multiple senders?
+    message._sender = self
+    self.session.outgoing.append(message)
+    self.queued += 1
+    mno = self.queued
+
+    self._wakeup()
+
+    if sync:
+      self._ewait(lambda: self.acked >= mno)
+      assert message not in self.session.outgoing
 
   @synchronized
   def close(self):
     """
     Close the Sender.
     """
+    # XXX: should make driver do something here
     if not self.closed:
       self.session.senders.remove(self)
       self.closed = True
 
-class Empty(Exception):
+class ReceiveError(SessionError):
+  pass
+
+class Empty(ReceiveError):
   """
   Exception raised by L{Receiver.fetch} when there is no message
   available within the alloted time.
   """
   pass
 
-class Receiver(Lockable):
+class Receiver:
 
   """
   Receives incoming messages from a remote source. Messages may be
@@ -693,43 +615,39 @@
     self.destination = str(self.index)
     self.source = source
     self.filter = filter
+
     self.started = started
     self.capacity = UNLIMITED
+    self.granted = Serial(0)
+    self.drain = False
+    self.impending = Serial(0)
+    self.received = Serial(0)
+    self.returned = Serial(0)
+
+    self.closing = False
     self.closed = False
     self.listener = None
-    self._ssn = None
-    self._queue = None
     self._lock = self.session._lock
-    self._condition = self.session._condition
 
-  def _link(self):
-    self._ssn = self.session._ssn
-    result = self.session._exchange_query(self.source)
-    if result.not_found:
-      self._queue = self.source
-      # XXX: should check 'create' option
-      self._ssn.queue_declare(queue=self._queue, durable=False)
-    else:
-      self._queue = "%s.%s" % (self.session.name, self.destination)
-      self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True)
-      if self.filter is None:
-        f = FILTER_DEFAULTS[result.type]
-      else:
-        f = self.filter
-      f._bind(self._ssn, self.source, self._queue)
-    self._ssn.message_subscribe(queue=self._queue, destination=self.destination,
-                                sync=True)
-    self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit)
-    self._ssn.sync()
-    if self.started:
-      self._start()
+  def _wakeup(self):
+    self.session._wakeup()
+
+  def _check_error(self, exc=ReceiveError):
+    self.session._check_error(exc)
 
-  def _disconnected(self):
-    self._ssn = None
+  def _ewait(self, predicate, timeout=None, exc=ReceiveError):
+    return self.session._ewait(predicate, timeout, exc)
 
   @synchronized
   def pending(self):
-    return self.session._count(self._pred)
+    """
+    Returns the number of messages available to be fetched by the
+    application.
+
+    @rtype: int
+    @return: the number of available messages
+    """
+    return self.received - self.returned
 
   def _capacity(self):
     if not self.started:
@@ -762,23 +680,36 @@
     @type timeout: float
     @param timeout: the time to wait for a message to be available
     """
-    if self.capacity is not UNLIMITED or not self.started:
-      self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
-                             UNLIMITED.value)
-      self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+    if self._capacity() == 0:
+      self.granted = self.returned + 1
+      self._wakeup()
+    self._ewait(lambda: self.impending >= self.granted)
     msg = self.session._get(self._pred, timeout=timeout)
     if msg is None:
-      self._ssn.message_flush(self.destination)
-      self._start()
-      self._ssn.sync()
+      self.drain = True
+      self.granted = self.received
+      self._wakeup()
+      self._ewait(lambda: self.impending == self.received)
+      self.drain = False
+      self._grant()
+      self._wakeup()
       msg = self.session._get(self._pred, timeout=0)
       if msg is None:
         raise Empty()
+    elif self._capacity() not in (0, UNLIMITED.value):
+      self.granted += 1
+      self._wakeup()
     return msg
 
-  def _start(self):
-    self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value)
-    self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity())
+  def _grant(self):
+    if self.started:
+      if self.capacity is UNLIMITED:
+        self.granted = UNLIMITED
+      else:
+        self.granted = self.received + self._capacity()
+    else:
+      self.granted = self.received
+
 
   @synchronized
   def start(self):
@@ -786,34 +717,31 @@
     Start incoming message delivery for this receiver.
     """
     self.started = True
-    if self._ssn is not None:
-      self._start()
-
-  def _stop(self):
-    self._ssn.message_stop(self.destination)
+    self._grant()
+    self._wakeup()
 
   @synchronized
   def stop(self):
     """
     Stop incoming message delivery for this receiver.
     """
-    if self._ssn is not None:
-      self._stop()
     self.started = False
+    self._grant()
+    self._wakeup()
+    self._ewait(lambda: self.impending == self.received)
 
   @synchronized
   def close(self):
     """
     Close the receiver.
     """
-    if not self.closed:
-      self.closed = True
-      self._ssn.message_cancel(self.destination, sync=True)
-      self._ssn.sync()
+    self.closing = True
+    self._wakeup()
+    try:
+      self._ewait(lambda: self.closed)
+    finally:
       self.session.receivers.remove(self)
 
-
-
 def codec(name):
   type = PRIMITIVE[name]
 
@@ -889,6 +817,7 @@
     self.to = None
     self.reply_to = None
     self.correlation_id = None
+    self.durable = False
     self.properties = {}
     self.content_type = get_type(content)
     self.content = content
@@ -896,5 +825,7 @@
   def __repr__(self):
     return "Message(%r)" % self.content
 
-__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
-           "Empty", "timestamp", "uuid4"]
+__all__ = ["Connection", "Session", "Sender", "Receiver", "Pattern", "Message",
+           "ConnectionError", "ConnectError", "SessionError", "Disconnected",
+           "SendError", "InsufficientCapacity", "ReceiveError", "Empty",
+           "timestamp", "uuid4", "UNLIMITED", "AMQP_PORT", "AMQPS_PORT"]

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/session.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/session.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/session.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/session.py Wed Sep  9 13:05:43 2009
@@ -146,7 +146,8 @@
     if self._closing:
       raise SessionClosed()
 
-    if self.channel == None:
+    ch = self.channel
+    if ch == None:
       raise SessionDetached()
 
     if op == MessageTransfer:
@@ -162,14 +163,12 @@
     cmd = op(*args, **kwargs)
     cmd.sync = self.auto_sync or cmd.sync
     self.need_sync = not cmd.sync
-    cmd.channel = self.channel.id
+    cmd.channel = ch.id
 
     if op.RESULT:
       result = Future(exception=SessionException)
       self.results[self.sender.next_id] = result
 
-    log.debug("SENDING %s", cmd)
-
     self.send(cmd)
 
     log.debug("SENT %s", cmd)
@@ -245,13 +244,16 @@
     self._completed = RangedSet()
 
   def send(self, cmd):
+    ch = self.session.channel
+    if ch is None:
+      raise SessionDetached()
     cmd.id = self.next_id
     self.next_id += 1
     if self.session.send_id:
       self.session.send_id = False
-      self.session.channel.session_command_point(cmd.id, 0)
+      ch.session_command_point(cmd.id, 0)
     self.commands.append(cmd)
-    self.session.channel.connection.write_op(cmd)
+    ch.connection.write_op(cmd)
 
   def completed(self, commands):
     idx = 0

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/tests/messaging.py Wed Sep  9 13:05:43 2009
@@ -23,7 +23,8 @@
 import time
 from qpid.tests import Test
 from qpid.harness import Skipped
-from qpid.messaging import Connection, ConnectError, Disconnected, Empty, Message, UNLIMITED, uuid4
+from qpid.messaging import Connection, ConnectError, Disconnected, Empty, \
+    InsufficientCapacity, Message, UNLIMITED, uuid4
 from Queue import Queue, Empty as QueueEmpty
 
 class Base(Test):
@@ -71,19 +72,25 @@
     ssn.acknowledge()
     assert msg.content == content, "expected %r, got %r" % (content, msg.content)
 
-  def drain(self, rcv, limit=None):
+  def drain(self, rcv, limit=None, timeout=0, expected=None):
     contents = []
     try:
       while limit is None or len(contents) < limit:
-        contents.append(rcv.fetch(0).content)
+        contents.append(rcv.fetch(timeout=timeout).content)
     except Empty:
       pass
+    if expected is not None:
+      assert expected == contents, "expected %s, got %s" % (expected, contents)
     return contents
 
   def assertEmpty(self, rcv):
     contents = self.drain(rcv)
     assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
 
+  def assertPending(self, rcv, expected):
+    p = rcv.pending()
+    assert p == expected, "expected %s, got %s" % (expected, p)
+
   def sleep(self):
     time.sleep(self.delay())
 
@@ -107,7 +114,8 @@
     try:
       self.conn = Connection.open("localhost", 0)
       assert False, "connect succeeded"
-    except ConnectError:
+    except ConnectError, e:
+      # XXX: should verify that e includes appropriate diagnostic info
       pass
 
 class ConnectionTests(Base):
@@ -219,26 +227,27 @@
 
   # XXX, we need a convenient way to assert that required queues are
   # empty on setup, and possibly also to drain queues on teardown
-  def testAcknowledge(self):
+  def ackTest(self, acker, ack_capacity=None):
     # send a bunch of messages
     snd = self.ssn.sender("test-ack-queue")
-    tid = "a"
-    contents = ["testAcknowledge[%s, %s]" % (i, tid) for i in range(10)]
+    contents = [self.content("ackTest", i) for i in range(15)]
     for c in contents:
       snd.send(c)
 
     # drain the queue, verify the messages are there and then close
     # without acking
     rcv = self.ssn.receiver(snd.target)
-    assert contents == self.drain(rcv)
+    self.drain(rcv, expected=contents)
     self.ssn.close()
 
     # drain the queue again, verify that they are all the messages
     # were requeued, and ack this time before closing
     self.ssn = self.conn.session()
+    if ack_capacity is not None:
+      self.ssn.ack_capacity = ack_capacity
     rcv = self.ssn.receiver("test-ack-queue")
-    assert contents == self.drain(rcv)
-    self.ssn.acknowledge()
+    self.drain(rcv, expected=contents)
+    acker(self.ssn)
     self.ssn.close()
 
     # drain the queue a final time and verify that the messages were
@@ -247,6 +256,33 @@
     rcv = self.ssn.receiver("test-ack-queue")
     self.assertEmpty(rcv)
 
+  def testAcknowledge(self):
+    self.ackTest(lambda ssn: ssn.acknowledge())
+
+  def testAcknowledgeAsync(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False))
+
+  def testAcknowledgeAsyncAckCap0(self):
+    try:
+      try:
+        self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 0)
+        assert False, "acknowledge shouldn't succeed with ack_capacity of zero"
+      except InsufficientCapacity:
+        pass
+    finally:
+      self.ssn.ack_capacity = UNLIMITED
+      self.drain(self.ssn.receiver("test-ack-queue"))
+      self.ssn.acknowledge()
+
+  def testAcknowledgeAsyncAckCap1(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 1)
+
+  def testAcknowledgeAsyncAckCap5(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), 5)
+
+  def testAcknowledgeAsyncAckCapUNLIMITED(self):
+    self.ackTest(lambda ssn: ssn.acknowledge(sync=False), UNLIMITED)
+
   def send(self, ssn, queue, base, count=1):
     snd = ssn.sender(queue)
     contents = []
@@ -319,7 +355,8 @@
       txssn.acknowledge()
     else:
       txssn.rollback()
-      assert contents == self.drain(txrcv)
+      drained = self.drain(txrcv)
+      assert contents == drained, "expected %s, got %s" % (contents, drained)
       txssn.acknowledge()
       txssn.rollback()
       assert contents == self.drain(txrcv)
@@ -401,9 +438,9 @@
       elapsed = time.time() - start
       assert elapsed >= self.delay()
 
-    one = self.send("testListen", 1)
-    two = self.send("testListen", 2)
-    three = self.send("testListen", 3)
+    one = self.send("testFetch", 1)
+    two = self.send("testFetch", 2)
+    three = self.send("testFetch", 3)
     msg = self.rcv.fetch(0)
     assert msg.content == one
     msg = self.rcv.fetch(self.delay())
@@ -467,34 +504,35 @@
   def testCapacity(self):
     self.rcv.capacity = 5
     self.rcv.start()
-    assert self.rcv.pending() == 0
+    self.assertPending(self.rcv, 0)
 
     for i in range(15):
       self.send("testCapacity", i)
     self.sleep()
-    assert self.rcv.pending() == 5
+    self.assertPending(self.rcv, 5)
 
     self.drain(self.rcv, limit = 5)
     self.sleep()
-    assert self.rcv.pending() == 5
+    self.assertPending(self.rcv, 5)
 
-    self.drain(self.rcv)
-    assert self.rcv.pending() == 0
+    drained = self.drain(self.rcv)
+    assert len(drained) == 10
+    self.assertPending(self.rcv, 0)
 
     self.ssn.acknowledge()
 
   def testCapacityUNLIMITED(self):
     self.rcv.capacity = UNLIMITED
     self.rcv.start()
-    assert self.rcv.pending() == 0
+    self.assertPending(self.rcv, 0)
 
     for i in range(10):
       self.send("testCapacityUNLIMITED", i)
     self.sleep()
-    assert self.rcv.pending() == 10
+    self.assertPending(self.rcv, 10)
 
     self.drain(self.rcv)
-    assert self.rcv.pending() == 0
+    self.assertPending(self.rcv, 0)
 
     self.ssn.acknowledge()
 
@@ -535,6 +573,48 @@
   def testSendMap(self):
     self.checkContent({"testSendMap": self.test_id, "pie": "blueberry", "pi": 3.14})
 
+  def asyncTest(self, capacity):
+    self.snd.capacity = capacity
+    msgs = [self.content("asyncTest", i) for i in range(15)]
+    for m in msgs:
+      self.snd.send(m, sync=False)
+    drained = self.drain(self.rcv, timeout=self.delay())
+    assert msgs == drained, "expected %s, got %s" % (msgs, drained)
+    self.ssn.acknowledge()
+
+  def testSendAsyncCapacity0(self):
+    try:
+      self.asyncTest(0)
+      assert False, "send shouldn't succeed with zero capacity"
+    except InsufficientCapacity:
+      # this is expected
+      pass
+
+  def testSendAsyncCapacity1(self):
+    self.asyncTest(1)
+
+  def testSendAsyncCapacity5(self):
+    self.asyncTest(5)
+
+  def testSendAsyncCapacityUNLIMITED(self):
+    self.asyncTest(UNLIMITED)
+
+  def testCapacityTimeout(self):
+    self.snd.capacity = 1
+    msgs = []
+    caught = False
+    while len(msgs) < 100:
+      m = self.content("testCapacity", len(msgs))
+      try:
+        self.snd.send(m, sync=False, timeout=0)
+        msgs.append(m)
+      except InsufficientCapacity:
+        caught = True
+        break
+    self.drain(self.rcv, expected=msgs)
+    self.ssn.acknowledge()
+    assert caught, "did not exceed capacity"
+
 class MessageTests(Base):
 
   def testCreateString(self):

Modified: qpid/branches/java-network-refactor/qpid/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid/util.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid/util.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid/util.py Wed Sep  9 13:05:43 2009
@@ -134,3 +134,9 @@
     if self.port:
       s += ":%s" % self.port
     return s
+
+def default(value, default):
+  if value is None:
+    return default
+  else:
+    return value

Modified: qpid/branches/java-network-refactor/qpid/python/qpid_config.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/qpid_config.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/qpid_config.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/qpid_config.py Wed Sep  9 13:05:43 2009
@@ -19,7 +19,7 @@
 
 import os
 
-qpid_home = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-amqp_spec = os.path.join(qpid_home, "specs", "amqp.0-10-qpid-errata.xml")
-amqp_spec_0_8 = os.path.join(qpid_home, "specs", "amqp.0-8.xml")
-amqp_spec_0_9 = os.path.join(qpid_home, "specs", "amqp.0-9.xml")
+AMQP_SPEC_DIR=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "specs")
+amqp_spec = os.path.join(AMQP_SPEC_DIR, "amqp.0-10-qpid-errata.xml")
+amqp_spec_0_8 = os.path.join(AMQP_SPEC_DIR, "amqp.0-8.xml")
+amqp_spec_0_9 = os.path.join(AMQP_SPEC_DIR, "amqp.0-9.xml")

Modified: qpid/branches/java-network-refactor/qpid/python/tests/datatypes.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/tests/datatypes.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/tests/datatypes.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/tests/datatypes.py Wed Sep  9 13:05:43 2009
@@ -54,6 +54,19 @@
     d[serial(0)] = "zero"
     assert d[0] == "zero"
 
+  def testAdd(self):
+    assert serial(2) + 2 == serial(4)
+    assert serial(2) + 2 == 4
+
+  def testSub(self):
+    delta = serial(4) - serial(2)
+    assert isinstance(delta, int) or isinstance(delta, long)
+    assert delta == 2
+
+    delta = serial(4) - 2
+    assert isinstance(delta, Serial)
+    assert delta == serial(2)
+
 class RangedSetTest(TestCase):
 
   def check(self, ranges):

Modified: qpid/branches/java-network-refactor/qpid/python/tests_0-10/alternate_exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/tests_0-10/alternate_exchange.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/tests_0-10/alternate_exchange.py Wed Sep  9 13:05:43 2009
@@ -141,7 +141,61 @@
             session.exchange_delete(exchange="e")
             session.exchange_delete(exchange="alternate")
             self.assertEquals(530, e.args[0].error_code)
-            
+
+
+    def test_modify_existing_exchange_alternate(self):
+        """
+        Ensure that attempting to modify an exhange to change
+        the alternate throws an exception
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="alt2", type="direct")
+        session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+        try:
+            # attempt to change the alternate on an already existing exchange
+            session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt2")
+            self.fail("Expected changing an alternate on an existing exchange to fail")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+        session = self.conn.session("alternate", 2)
+        session.exchange_delete(exchange="onealternate")
+        session.exchange_delete(exchange="alt2")
+        session.exchange_delete(exchange="alt1")
+
+
+    def test_add_alternate_to_exchange(self):
+        """
+        Ensure that attempting to modify an exhange by adding
+        an alternate throws an exception
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="noalternate", type="fanout")
+        try:
+            # attempt to add an alternate on an already existing exchange
+            session.exchange_declare(exchange="noalternate", type="fanout", alternate_exchange="alt1")
+            self.fail("Expected adding an alternate on an existing exchange to fail")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+        session = self.conn.session("alternate", 2)
+        session.exchange_delete(exchange="noalternate")
+        session.exchange_delete(exchange="alt1")
+
+
+    def test_del_alternate_to_exchange(self):
+        """
+        Ensure that attempting to modify an exhange by declaring
+        it again without an alternate does nothing
+        """
+        session = self.session
+        session.exchange_declare(exchange="alt1", type="direct")
+        session.exchange_declare(exchange="onealternate", type="fanout", alternate_exchange="alt1")
+        # attempt to re-declare without an alternate - silently ignore
+        session.exchange_declare(exchange="onealternate", type="fanout" )
+        session.exchange_delete(exchange="onealternate")
+        session.exchange_delete(exchange="alt1")
+
 
     def assertEmpty(self, queue):
         try:

Modified: qpid/branches/java-network-refactor/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-network-refactor/qpid/python/tests_0-10/management.py?rev=812936&r1=812935&r2=812936&view=diff
==============================================================================
--- qpid/branches/java-network-refactor/qpid/python/tests_0-10/management.py (original)
+++ qpid/branches/java-network-refactor/qpid/python/tests_0-10/management.py Wed Sep  9 13:05:43 2009
@@ -295,3 +295,25 @@
         sleep(1)
         self.assertEqual(handler.check(), "pass")
 
+    def test_connection_close(self):
+        """
+        Test management method for closing connection
+        """
+        self.startQmf()
+        conn = self.connect()
+        session = conn.session("my-named-session")
+
+        #using qmf find named session and close the corresponding connection:
+        qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+        qmf_ssn_object._connectionRef_.close()
+
+        #check that connection is closed
+        try:
+            conn.session("another-session")
+            self.fail("Expected failure from closed connection")
+        except: None
+        
+        #make sure that the named session has been closed and the name can be re-used
+        conn = self.connect()
+        session = conn.session("my-named-session")
+        session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org