You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [40/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Fri Oct 21 14:42:12 2011
@@ -25,11 +25,9 @@ import org.apache.qpid.test.utils.QpidBr
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,70 +47,67 @@ public class ChannelCloseTest extends Qp
     private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseTest.class);
 
     Connection _connection;
-    private String _brokerlist = "vm://:1";
     private Session _session;
     private static final long SYNC_TIMEOUT = 500;
     private int TEST = 0;
 
-    /*
-          close channel, use chanel with same id ensure error.
+    /**
+     * Close channel, use chanel with same id ensure error.
+     *
+     * This test is only valid for non 0-10 connection .
      */
     public void testReusingChannelAfterFullClosure() throws Exception
     {
-        // this is testing an inVM Connetion conneciton 
-        if (isJavaBroker() && !isExternalBroker())
+        _connection=newConnection();
+
+        // Create Producer
+        try
         {
-            _connection=newConnection();
+            _connection.start();
+
+            createChannelAndTest(1);
 
-            // Create Producer
+            // Cause it to close
             try
             {
-                _connection.start();
-
-                createChannelAndTest(1);
-
-                // Cause it to close
-                try
-                {
-                    _logger.info("Testing invalid exchange");
-                    declareExchange(1, "", "name_that_will_lookup_to_null", false);
-                    fail("Exchange name is empty so this should fail ");
-                }
-                catch (AMQException e)
-                {
-                    assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
-                }
+                _logger.info("Testing invalid exchange");
+                declareExchange(1, "", "name_that_will_lookup_to_null", false);
+                fail("Exchange name is empty so this should fail ");
+            }
+            catch (AMQException e)
+            {
+                assertEquals("Exchange should not be found", AMQConstant.NOT_FOUND, e.getErrorCode());
+            }
 
-                // Check that
-                try
+            // Check that
+            try
+            {
+                _logger.info("Testing valid exchange should fail");
+                declareExchange(1, "topic", "amq.topic", false);
+                fail("This should not succeed as the channel should be closed ");
+            }
+            catch (AMQException e)
+            {
+                if (_logger.isInfoEnabled())
                 {
-                    _logger.info("Testing valid exchange should fail");
-                    declareExchange(1, "topic", "amq.topic", false);
-                    fail("This should not succeed as the channel should be closed ");
+                    _logger.info("Exception occured was:" + e.getErrorCode());
                 }
-                catch (AMQException e)
-                {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Exception occured was:" + e.getErrorCode());
-                    }
 
-                    assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
+                assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
 
-                    _connection=newConnection();
-                }
+                _connection=newConnection();
+            }
 
-                checkSendingMessage();
+            checkSendingMessage();
 
-                _session.close();
-                _connection.close();
+            _session.close();
+            _connection.close();
 
-            }
-            catch (JMSException e)
-            {
-                e.printStackTrace();
-                fail(e.getMessage());
-            }
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+            fail(e.getMessage());
         }
     }
 
@@ -306,27 +301,19 @@ public class ChannelCloseTest extends Qp
 
     private Connection newConnection()
     {
-        AMQConnection connection = null;
+        Connection connection = null;
         try
         {
-            connection = new AMQConnection("amqp://guest:guest@CCTTest/test?brokerlist='" + _brokerlist + "'");
+            connection = getConnection();
 
-            connection.setConnectionListener(this);
+            ((AMQConnection) connection).setConnectionListener(this);
 
             _session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
             connection.start();
 
         }
-        catch (JMSException e)
-        {
-            fail("Creating new connection when:" + e.getMessage());
-        }
-        catch (AMQException e)
-        {
-            fail("Creating new connection when:" + e.getMessage());
-        }
-        catch (URLSyntaxException e)
+        catch (Exception e)
         {
             fail("Creating new connection when:" + e.getMessage());
         }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/CloseAfterConnectionFailureTest.java Fri Oct 21 14:42:12 2011
@@ -55,7 +55,7 @@ public class CloseAfterConnectionFailure
             try
             {
                 //Start the connection so it will use the retries
-                connection = new AMQConnection(url, null);
+                connection = new AMQConnection(url);
 
                 connection.setExceptionListener(this);
 

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionCloseTest.java Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -98,6 +99,31 @@ public class ConnectionCloseTest extends
                    delta.size() < deltaThreshold);
     }
 
+    /**
+     * This test is added due to QPID-3453 to test connection closing when AMQ
+     * session is not closed but underlying transport session is in detached
+     * state and transport connection is closed
+     */
+    public void testConnectionCloseOnOnForcibleBrokerStop() throws Exception
+    {
+        Connection connection = getConnection();
+        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        stopBroker();
+
+        // we need to close connection explicitly in order to verify that
+        // closing of connection having transport session in DETACHED state and
+        // transport connection in CLOSED state does not throw an exception
+        try
+        {
+            connection.close();
+        }
+        catch (JMSException e)
+        {
+            // session closing should not fail
+            fail("Cannot close connection:" + e.getMessage());
+        }
+    }
+
     private void dumpStacks(Map<Thread,StackTraceElement[]> map)
     {
         for (Map.Entry<Thread,StackTraceElement[]> entry : map.entrySet())

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Fri Oct 21 14:42:12 2011
@@ -20,32 +20,31 @@
  */
 package org.apache.qpid.test.unit.client.connection;
 
+import javax.jms.Connection;
+import javax.jms.QueueSession;
+import javax.jms.TopicSession;
+
 import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.BrokerDetails;
-
-import javax.jms.Connection;
-import javax.jms.QueueSession;
-import javax.jms.TopicSession;
-import javax.naming.NamingException;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 public class ConnectionTest extends QpidBrokerTestCase
 {
 
-    String _broker_NotRunning = "vm://:2";
+    String _broker_NotRunning = "tcp://localhost:" + findFreePort();
+
     String _broker_BadDNS = "tcp://hg3sgaaw4lgihjs";
 
     public void testSimpleConnection() throws Exception
@@ -83,21 +82,21 @@ public class ConnectionTest extends Qpid
                                      + "&temporaryTopicExchange='tmp.topic'");
 
             System.err.println(url.toString());
-            conn = new AMQConnection(url, null);
+            conn = new AMQConnection(url);
 
 
             AMQSession sess = (AMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            
-            sess.declareExchange(new AMQShortString("test.direct"), 
+
+            sess.declareExchange(new AMQShortString("test.direct"),
                     ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
 
-            sess.declareExchange(new AMQShortString("tmp.direct"), 
+            sess.declareExchange(new AMQShortString("tmp.direct"),
                     ExchangeDefaults.DIRECT_EXCHANGE_CLASS, false);
 
-            sess.declareExchange(new AMQShortString("tmp.topic"), 
+            sess.declareExchange(new AMQShortString("tmp.topic"),
                     ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
 
-            sess.declareExchange(new AMQShortString("test.topic"), 
+            sess.declareExchange(new AMQShortString("test.topic"),
                     ExchangeDefaults.TOPIC_EXCHANGE_CLASS, false);
 
             QueueSession queueSession = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -113,7 +112,7 @@ public class ConnectionTest extends Qpid
             queueSession.close();
 
             TopicSession topicSession = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-            
+
             AMQTopic topic = (AMQTopic) topicSession.createTopic("silly.topic");
 
             assertEquals(topic.getExchangeName().toString(), "test.topic");
@@ -271,7 +270,7 @@ public class ConnectionTest extends Qpid
         }
         connection.close();
     }
-    
+
     public void testUnsupportedSASLMechanism() throws Exception
     {
         BrokerDetails broker = getBroker();
@@ -287,10 +286,64 @@ public class ConnectionTest extends Qpid
         }
         catch (Exception e)
         {
-            assertTrue("Incorrect exception thrown",
-                       e.getMessage().contains("The following SASL mechanisms " +
-                       "[MY_MECH]"  + 
-                       " specified by the client are not supported by the broker"));
+            assertTrue("Unexpected exception message : " + e.getMessage(),
+                       e.getMessage().contains("Client and broker have no SASL mechanisms in common."));
+            assertTrue("Unexpected exception message : " + e.getMessage(),
+                    e.getMessage().contains("Client restricted itself to : MY_MECH"));
+
+        }
+    }
+
+    /**
+     * Tests that when the same user connects twice with same clientid, the second connection
+     * fails if the clientid verification feature is enabled (which uses a dummy 0-10 Session
+     * with the clientid as its name to detect the previous usage of the clientid by the user)
+     */
+    public void testClientIDVerificationForSameUser() throws Exception
+    {
+        setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true");
+
+        BrokerDetails broker = getBroker();
+        try
+        {
+            Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+                                        "client_id", "test");
+
+            Connection con2 = new AMQConnection(broker.toString(), "guest", "guest",
+                                        "client_id", "test");
+
+            fail("The client should throw a ConnectionException stating the" +
+                    " client ID is not unique");
+        }
+        catch (Exception e)
+        {
+            assertTrue("Incorrect exception thrown: " + e.getMessage(),
+                       e.getMessage().contains("ClientID must be unique"));
+        }
+    }
+
+    /**
+     * Tests that when different users connects with same clientid, the second connection
+     * succeeds even though the clientid verification feature is enabled (which uses a dummy
+     * 0-10 Session with the clientid as its name; these are only verified unique on a
+     * per-principal basis)
+     */
+    public void testClientIDVerificationForDifferentUsers() throws Exception
+    {
+        setTestSystemProperty(ClientProperties.QPID_VERIFY_CLIENT_ID, "true");
+
+        BrokerDetails broker = getBroker();
+        try
+        {
+            Connection con = new AMQConnection(broker.toString(), "guest", "guest",
+                                        "client_id", "test");
+
+            Connection con2 = new AMQConnection(broker.toString(), "admin", "admin",
+                                        "client_id", "test");
+        }
+        catch (Exception e)
+        {
+            fail("Unexpected exception thrown, client id was not unique but usernames were different! " + e.getMessage());
         }
     }
 

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/ExceptionListenerTest.java Fri Oct 21 14:42:12 2011
@@ -47,13 +47,15 @@ public class ExceptionListenerTest exten
         {
             public void onException(JMSException e)
             {
+                _logger.debug("&&&&&&&&&&&&&&&&&&&&&&&&&&&& Caught exception &&&&&&&&&&&&&&&&&&&&&&&&&&&& ", e);
                 fired.countDown();
             }
         });
-
+        _logger.debug("%%%%%%%%%%%%%%%% Stopping Broker %%%%%%%%%%%%%%%%%%%%%");
         stopBroker();
+        _logger.debug("%%%%%%%%%%%%%%%% Stopped Broker  %%%%%%%%%%%%%%%%%%%%%");
 
-        if (!fired.await(3, TimeUnit.SECONDS))
+        if (!fired.await(5, TimeUnit.SECONDS))
         {
             fail("exception listener was not fired");
         }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/message/ObjectMessageTest.java Fri Oct 21 14:42:12 2011
@@ -20,14 +20,10 @@
  */
 package org.apache.qpid.test.unit.client.message;
 
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -35,10 +31,13 @@ import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ObjectMessageTest extends QpidBrokerTestCase implements MessageListener
 {
@@ -67,7 +66,7 @@ public class ObjectMessageTest extends Q
         connection.start();
 
         // create a publisher
-        producer = session.createProducer(destination, false, false, true);
+        producer = session.createProducer(destination, false, false);
         A a1 = new A(1, "A");
         A a2 = new A(2, "a");
         B b = new B(1, "B");

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java Fri Oct 21 14:42:12 2011
@@ -25,27 +25,26 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 
-import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.TestNetworkConnection;
 
 public class AMQProtocolSessionTest extends QpidBrokerTestCase
 {
-    private static class AMQProtSession extends AMQProtocolSession
+    private static class TestProtocolSession extends AMQProtocolSession
     {
 
-        public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+        public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
         {
             super(protocolHandler,connection);
         }
 
-        public TestNetworkDriver getNetworkDriver()
+        public TestNetworkConnection getNetworkConnection()
         {
-            return (TestNetworkDriver) _protocolHandler.getNetworkDriver();
+            return (TestNetworkConnection) _protocolHandler.getNetworkConnection();
         }
 
         public AMQShortString genQueueName()
@@ -54,7 +53,7 @@ public class AMQProtocolSessionTest exte
         }
     }
 
-    private AMQProtSession _testSession;
+    private TestProtocolSession _testSession;
 
     protected void setUp() throws Exception
     {
@@ -62,10 +61,10 @@ public class AMQProtocolSessionTest exte
 
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con);
-        protocolHandler.setNetworkDriver(new TestNetworkDriver());
-        
+        protocolHandler.setNetworkConnection(new TestNetworkConnection());
+
         //don't care about the values set here apart from the dummy IoSession
-        _testSession = new AMQProtSession(protocolHandler , con);
+        _testSession = new TestProtocolSession(protocolHandler , con);
     }
     
     public void testTemporaryQueueWildcard() throws UnknownHostException
@@ -93,14 +92,9 @@ public class AMQProtocolSessionTest exte
         checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
     }
     
-    public void testTemporaryQueuePipe() throws UnknownHostException
-    {
-        checkTempQueueName(new VmPipeAddress(1), "tmp_vm_1_1");
-    }
-    
     private void checkTempQueueName(SocketAddress address, String queueName)
     {
-        _testSession.getNetworkDriver().setLocalAddress(address);
+        _testSession.getNetworkConnection().setLocalAddress(address);
         assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString());
     }
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/client/temporaryqueue/TemporaryQueueTest.java Fri Oct 21 14:42:12 2011
@@ -22,237 +22,145 @@
 package org.apache.qpid.test.unit.client.temporaryqueue;
 
 import javax.jms.Connection;
-import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
-import junit.framework.Assert;
 
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.ConnectionListener;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
-
-public class TemporaryQueueTest extends QpidBrokerTestCase implements ExceptionListener
+/**
+ * Tests the behaviour of {@link TemporaryQueue}.
+ */
+public class TemporaryQueueTest extends QpidBrokerTestCase
 {
-    private List<Exception> _exceptions = new ArrayList<Exception>();
-
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-       super.tearDown();
-    }
-
-    protected Connection createConnection() throws Exception
-    {
-        return  getConnection("guest", "guest");
-    }
-
-    public void testTemporaryQueue() throws Exception
-    {
-        Connection conn = createConnection();
-        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        TemporaryQueue queue = session.createTemporaryQueue();
+    /**
+     * Tests the basic produce/consume behaviour of a temporary queue.
+     */
+    public void testMessageDeliveryUsingTemporaryQueue() throws Exception
+    {
+        final Connection conn = getConnection();
+        final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = session.createTemporaryQueue();
         assertNotNull(queue);
-        MessageProducer producer = session.createProducer(queue);
-        MessageConsumer consumer = session.createConsumer(queue);
+        final MessageProducer producer = session.createProducer(queue);
+        final MessageConsumer consumer = session.createConsumer(queue);
         conn.start();
         producer.send(session.createTextMessage("hello"));
         TextMessage tm = (TextMessage) consumer.receive(2000);
-        assertNotNull(tm);
+        assertNotNull("Message not received", tm);
         assertEquals("hello", tm.getText());
+    }
 
-        try
-        {
-            queue.delete();
-            fail("Expected JMSException : should not be able to delete while there are active consumers");
-        }
-        catch (JMSException je)
-        {
-            ; //pass
-        }
-
-        consumer.close();
+    /**
+     * Tests that a temporary queue cannot be used by another {@link Session}.
+     */
+    public void testUseFromAnotherSessionProhibited() throws Exception
+    {
+        final Connection conn = getConnection();
+        final Session session1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Session session2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = session1.createTemporaryQueue();
+        assertNotNull(queue);
 
         try
         {
-            queue.delete();
+            session2.createConsumer(queue);
+            fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
         }
         catch (JMSException je)
         {
-            fail("Unexpected Exception: " + je.getMessage());
+            //pass
+            assertEquals("Cannot consume from a temporary destination created on another session", je.getMessage());
         }
-
-        conn.close();
-    }
-
-    public void tUniqueness() throws Exception
-    {
-        int numProcs = Runtime.getRuntime().availableProcessors();
-        final int threadsProc = 5;
-
-        runUniqueness(1, 10);
-        runUniqueness(numProcs * threadsProc, 10);
-        runUniqueness(numProcs * threadsProc, 100);
-        runUniqueness(numProcs * threadsProc, 500);
     }
 
-    void runUniqueness(int makers, int queues) throws Exception
-    {
-        Connection connection = createConnection();
-
-        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    /**
+     * Tests that the client is able to explicitly delete a temporary queue using
+     * {@link TemporaryQueue#delete()} and is prevented from deleting one that
+     * still has consumers.
+     *
+     * Note: Under < 0-10 {@link TemporaryQueue#delete()} only marks the queue as deleted
+     * on the client. 0-10 causes the queue to be deleted from the Broker.
+     */
+    public void testExplictTemporaryQueueDeletion() throws Exception
+    {
+        final Connection conn = getConnection();
+        final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; // Required to observe the queue binding on the Broker
+        final TemporaryQueue queue = session.createTemporaryQueue();
+        assertNotNull(queue);
+        final MessageConsumer consumer = session.createConsumer(queue);
+        conn.start();
 
-        List<TempQueueMaker> tqList = new LinkedList<TempQueueMaker>();
+        assertTrue("Queue should be bound", amqSession.isQueueBound((AMQDestination)queue));
 
-        //Create Makers
-        for (int m = 0; m < makers; m++)
+        try
         {
-            tqList.add(new TempQueueMaker(session, queues));
+            queue.delete();
+            fail("Expected JMSException : should not be able to delete while there are active consumers");
         }
-
-
-        List<Thread> threadList = new LinkedList<Thread>();
-
-        //Create Makers
-        for (TempQueueMaker maker : tqList)
+        catch (JMSException je)
         {
-            threadList.add(new Thread(maker));
+            //pass
+            assertEquals("Temporary Queue has consumers so cannot be deleted", je.getMessage());
         }
+        consumer.close();
 
-        //Start threads
-        for (Thread thread : threadList)
-        {
-            thread.start();
-        }
+        // Now deletion should succeed.
+        queue.delete();
 
-        // Join Threads
-        for (Thread thread : threadList)
+        try
         {
-            try
-            {
-                thread.join();
-            }
-            catch (InterruptedException e)
-            {
-                fail("Couldn't correctly join threads");
-            }
+            session.createConsumer(queue);
+            fail("Exception not thrown");
         }
-
-
-        List<AMQQueue> list = new LinkedList<AMQQueue>();
-
-        // Test values
-        for (TempQueueMaker maker : tqList)
+        catch (JMSException je)
         {
-            check(maker, list);
+            //pass
+            assertEquals("Cannot consume from a deleted destination", je.getMessage());
         }
 
-        Assert.assertEquals("Not enough queues made.", makers * queues, list.size());
-
-        connection.close();
-    }
-
-    private void check(TempQueueMaker tq, List<AMQQueue> list)
-    {
-        for (AMQQueue q : tq.getList())
+        if (isBroker010())
         {
-            if (list.contains(q))
-            {
-                fail(q + " already exists.");
-            }
-            else
-            {
-                list.add(q);
-            }
+            assertFalse("Queue should no longer be bound", amqSession.isQueueBound((AMQDestination)queue));
         }
     }
 
-
-    class TempQueueMaker implements Runnable
+    /**
+     * Tests that a temporary queue remains available for reuse even after its initial
+     * consumer has disconnected.
+     *
+     *  This test would fail under < 0-10 as their temporary queues are deleted automatically
+     *  (broker side) after the last consumer disconnects (so message2 would be lost). For this
+     *  reason this test is excluded from those profiles.
+     */
+    public void testTemporaryQueueReused() throws Exception
     {
-        List<AMQQueue> _queues;
-        Session _session;
-        private int _count;
-
-
-        TempQueueMaker(Session session, int queues) throws JMSException
-        {
-            _queues = new LinkedList<AMQQueue>();
-
-            _count = queues;
-
-            _session = session;
-        }
-
-        public void run()
-        {
-            int i = 0;
-            try
-            {
-                for (; i < _count; i++)
-                {
-                    _queues.add((AMQQueue) _session.createTemporaryQueue());
-                }
-            }
-            catch (JMSException jmse)
-            {
-                //stop
-            }
-        }
-
-        List<AMQQueue> getList()
-        {
-            return _queues;
-        }
-    }
+        final Connection conn = getConnection();
+        final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final TemporaryQueue queue = session.createTemporaryQueue();
+        assertNotNull(queue);
 
-    public void testQPID1217() throws Exception
-    {
-        Connection conA = getConnection();
-        conA.setExceptionListener(this);
-        Session sessA = conA.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        TemporaryQueue temp = sessA.createTemporaryQueue();
-        
-        MessageProducer prod = sessA.createProducer(temp);
-        prod.send(sessA.createTextMessage("hi"));
-
-        Thread.sleep(500);
-        assertTrue("Exception received", _exceptions.isEmpty());
-        
-        Connection conB = getConnection();
-        Session sessB = conB.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        
-        JMSException ex = null;
-        try
-        {
-            MessageConsumer consB = sessB.createConsumer(temp);
-        } 
-        catch (JMSException e)
-        {
-            ex = e; 
-        }
-        assertNotNull(ex);
-    }
-    
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(TemporaryQueueTest.class);
-    }
+        final MessageProducer producer1 = session.createProducer(queue);
+        final MessageConsumer consumer1 = session.createConsumer(queue);
+        conn.start();
+        producer1.send(session.createTextMessage("message1"));
+        producer1.send(session.createTextMessage("message2"));
+        TextMessage tm = (TextMessage) consumer1.receive(2000);
+        assertNotNull("Message not received by first consumer", tm);
+        assertEquals("message1", tm.getText());
+        consumer1.close();
 
-    public void onException(JMSException arg0)
-    {
-        _exceptions.add(arg0);
+        final MessageConsumer consumer2 = session.createConsumer(queue);
+        conn.start();
+        tm = (TextMessage) consumer2.receive(2000);
+        assertNotNull("Message not received by second consumer", tm);
+        assertEquals("message2", tm.getText());
+        consumer2.close();
     }
-
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Fri Oct 21 14:42:12 2011
@@ -50,7 +50,6 @@ public class MessageRequeueTest extends 
     protected final String queue = "direct://amq.direct//message-requeue-test-queue";
     protected String payload = "Message:";
 
-    //protected final String BROKER = "vm://:1";
     protected final String BROKER = "tcp://127.0.0.1:5672";
     private boolean testReception = true;
 
@@ -155,8 +154,8 @@ public class MessageRequeueTest extends 
         _logger.info("consumed: " + messagesReceived);
 
         assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
-        // wit 0_10 we can have a delivery tag of 0
-        if (conn.isBroker08())
+        // with 0_10 we can have a delivery tag of 0
+        if (!conn.isBroker010())
         {
             for (long b : messageLog)
             {
@@ -224,7 +223,7 @@ public class MessageRequeueTest extends 
         StringBuilder list = new StringBuilder();
         list.append("Failed to receive:");
         int failed = 0;
-        if (conn.isBroker08())
+        if (!conn.isBroker010())
         {
             for (long b : receieved)
             {

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java Fri Oct 21 14:42:12 2011
@@ -52,7 +52,7 @@ public class DurableSubscriberTest exten
      */
     public void testDurSubRestoredAfterNonPersistentMessageSent() throws Exception
     {
-        if (isBrokerStorePersistent() || !isBroker08())
+        if (isBrokerStorePersistent())
         {
             TopicConnectionFactory factory = getConnectionFactory();
             Topic topic = (Topic) getInitialContext().lookup(_topicName);
@@ -116,7 +116,7 @@ public class DurableSubscriberTest exten
      */
     public void testDurSubRestoresMessageSelector() throws Exception
     {
-        if (isBrokerStorePersistent() || !isBroker08())
+        if (isBrokerStorePersistent())
         {
             TopicConnectionFactory factory = getConnectionFactory();
             Topic topic = (Topic) getInitialContext().lookup(_topicName);

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/JMSPropertiesTest.java Fri Oct 21 14:42:12 2011
@@ -25,12 +25,14 @@ import org.apache.qpid.client.AMQConnect
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.message.NonQpidObjectMessage;
+import org.apache.qpid.client.message.QpidMessageProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -39,7 +41,11 @@ import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.Topic;
+
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * @author Apache Software Foundation
@@ -163,4 +169,39 @@ public class JMSPropertiesTest extends Q
         con.close();
     }
 
+    /**
+     * Test Goal : Test if custom message properties can be set and retrieved properly with out an error.
+     *             Also test if unsupported properties are filtered out. See QPID-2930.
+     */
+    public void testQpidExtensionProperties() throws Exception
+    {
+        Connection con = getConnection("guest", "guest");
+        Session ssn = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        con.start();
+
+        Topic topic = ssn.createTopic("test");
+        MessageConsumer consumer = ssn.createConsumer(topic);
+        MessageProducer prod = ssn.createProducer(topic);
+        Message m = ssn.createMessage();
+        m.setObjectProperty("foo-bar", "foobar".getBytes());
+        m.setObjectProperty(QpidMessageProperties.AMQP_0_10_APP_ID, "my-app-id");
+        prod.send(m);
+
+        Message msg = consumer.receive(1000);
+        assertNotNull(msg);
+
+    	Enumeration<String> enu = msg.getPropertyNames();
+    	Map<String,String> map = new HashMap<String,String>();
+    	while (enu.hasMoreElements())
+    	{
+    	    String name = enu.nextElement();
+    	    String value = msg.getStringProperty(name);
+    		map.put(name, value);
+       }
+
+       assertFalse("Property 'foo-bar' should have been filtered out",map.containsKey("foo-bar"));
+       assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_APP_ID + " should be present","my-app-id",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_APP_ID));
+       assertEquals("Property "+ QpidMessageProperties.AMQP_0_10_ROUTING_KEY + " should be present","test",msg.getStringProperty(QpidMessageProperties.AMQP_0_10_ROUTING_KEY));
+       
+    }
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.test.unit.message;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQHeadersExchange;
 import org.apache.qpid.client.AMQQueue;
@@ -50,21 +54,8 @@ import javax.jms.StreamMessage;
  */
 public class StreamMessageTest extends QpidBrokerTestCase
 {
-
     private static final Logger _logger = LoggerFactory.getLogger(StreamMessageTest.class);
 
-    public String _connectionString = "vm://:1";
-
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
     public void testStreamMessageEOF() throws Exception
     {
         Connection con = (AMQConnection) getConnection("guest", "guest");
@@ -114,6 +105,7 @@ public class StreamMessageTest extends Q
         try
         {
             msg2.readByte();
+            fail("Expected exception not thrown");
         }
         catch (Exception e)
         {
@@ -125,6 +117,9 @@ public class StreamMessageTest extends Q
 
     public void testModifyReceivedMessageExpandsBuffer() throws Exception
     {
+        final CountDownLatch awaitMessages = new CountDownLatch(1);
+        final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<Throwable>();
+
         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
         AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
         AMQQueue queue = new AMQQueue(con.getDefaultQueueExchangeName(), new AMQShortString("testQ"));
@@ -134,28 +129,38 @@ public class StreamMessageTest extends Q
 
                 public void onMessage(Message message)
                 {
-                    StreamMessage sm = (StreamMessage) message;
+                    final StreamMessage sm = (StreamMessage) message;
                     try
                     {
                         sm.clearBody();
+                        // it is legal to extend a stream message's content
                         sm.writeString("dfgjshfslfjshflsjfdlsjfhdsljkfhdsljkfhsd");
                     }
-                    catch (JMSException e)
+                    catch (Throwable t)
+                    {
+                        listenerCaughtException.set(t);
+                    }
+                    finally
                     {
-                        _logger.error("Error when writing large string to received msg: " + e, e);
-                        fail("Error when writing large string to received msg" + e);
+                        awaitMessages.countDown();
                     }
                 }
             });
 
         Connection con2 = (AMQConnection) getConnection("guest", "guest");
         AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        MessageProducer mandatoryProducer = producerSession.createProducer(queue);
+        MessageProducer producer = producerSession.createProducer(queue);
         con.start();
         StreamMessage sm = producerSession.createStreamMessage();
         sm.writeInt(42);
-        mandatoryProducer.send(sm);
-        Thread.sleep(2000);
+        producer.send(sm);
+
+        // Allow up to five seconds for the message to arrive with the consumer
+        final boolean completed = awaitMessages.await(5, TimeUnit.SECONDS);
+        assertTrue("Message did not arrive with consumer within a reasonable time", completed);
+        final Throwable listenerException = listenerCaughtException.get();
+        assertNull("No exception should be caught by listener : " + listenerException, listenerException);
+
         con.close();
         con2.close();
     }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/message/UTF8Test.java Fri Oct 21 14:42:12 2011
@@ -20,17 +20,20 @@
  */
 package org.apache.qpid.test.unit.message;
 
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.Properties;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.naming.InitialContext;
-import javax.jms.*;
-import java.util.Properties;
-import java.io.*;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 
 /**
@@ -41,8 +44,6 @@ import java.io.*;
  */
 public class UTF8Test extends QpidBrokerTestCase
 {
-    private static final Logger _logger = LoggerFactory.getLogger(UTF8Test.class);
-
     public void testPlainEn() throws Exception
     {
          invoke("UTF8En");
@@ -65,38 +66,24 @@ public class UTF8Test extends QpidBroker
 
     private void runTest(String exchangeName, String queueName, String routingKey, String data) throws Exception
     {
-        _logger.info("Running test for exchange: " + exchangeName
-                + " queue Name: " + queueName
-                + " routing key: " + routingKey);       
-        declareQueue(exchangeName, routingKey, queueName);
-
-        javax.jms.Connection con =  getConnection();
-        javax.jms.Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
-        Destination dest = getDestination(exchangeName, routingKey, queueName);
+        Connection con =  getConnection();
+        Session sess = con.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
+        final Destination dest = getDestination(exchangeName, routingKey, queueName);
+
+        final MessageConsumer msgCons = sess.createConsumer(dest);
+        con.start();
+
         // Send data
         MessageProducer msgProd = sess.createProducer(dest);
         TextMessage message = sess.createTextMessage(data);
         msgProd.send(message);
+
         // consume data
-        MessageConsumer msgCons = sess.createConsumer(dest);
-        con.start();
         TextMessage m = (TextMessage) msgCons.receive(RECEIVE_TIMEOUT);
         assertNotNull(m);
         assertEquals(m.getText(), data);
     }
 
-    private void declareQueue(String exch, String routkey, String qname) throws Exception
-    {
-        Connection conn = new Connection();
-        conn.connect("localhost", QpidBrokerTestCase.DEFAULT_PORT, "test", "guest", "guest",false);
-        Session sess = conn.createSession(0);
-        sess.exchangeDeclare(exch, "direct", null, null);
-        sess.queueDeclare(qname, null, null);
-        sess.exchangeBind(qname, exch, routkey, null);
-        sess.sync();
-        conn.close();        
-    }
-
     private Destination getDestination(String exch, String routkey, String qname) throws Exception
     {
         Properties props = new Properties();

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Oct 21 14:42:12 2011
@@ -262,7 +262,7 @@ public class DurableSubscriptionTest ext
         producer.send(session1.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
-        msg = consumer1.receive(500);
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Consumer 1 should get message 'B'.", msg);
         assertEquals("Incorrect Message received on consumer1.", "B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
@@ -287,13 +287,13 @@ public class DurableSubscriptionTest ext
         else
         {
             _logger.info("Receive message on consumer 3 :expecting B");
-            msg = consumer3.receive(500);
+            msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
             assertNotNull("Consumer 3 should get message 'B'.", msg);
             assertEquals("Incorrect Message received on consumer3.", "B", ((TextMessage) msg).getText());
         }
 
         _logger.info("Receive message on consumer 1 :expecting C");
-        msg = consumer1.receive(500);
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Consumer 1 should get message 'C'.", msg);
         assertEquals("Incorrect Message received on consumer1.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 1 :expecting null");
@@ -301,7 +301,7 @@ public class DurableSubscriptionTest ext
         assertNull("There should be no more messages for consumption on consumer1.", msg);
 
         _logger.info("Receive message on consumer 3 :expecting C");
-        msg = consumer3.receive(500);
+        msg = consumer3.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Consumer 3 should get message 'C'.", msg);
         assertEquals("Incorrect Message received on consumer3.", "C", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
@@ -358,7 +358,7 @@ public class DurableSubscriptionTest ext
         // Send message and check that both consumers get it and only it.
         producer.send(session0.createTextMessage("A"));
 
-        msg = consumer1.receive(500);
+        msg = consumer1.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull("Message should be available", msg);
         assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
         msg = consumer1.receive(500);
@@ -729,7 +729,7 @@ public class DurableSubscriptionTest ext
 
         conn.start();
         
-        Message rMsg = subB.receive(1000);
+        Message rMsg = subB.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelectorAndRestart2",
@@ -797,7 +797,7 @@ public class DurableSubscriptionTest ext
         
         conn.start();
         
-        Message rMsg = subTwo.receive(1000);
+        Message rMsg = subTwo.receive(POSITIVE_RECEIVE_TIMEOUT);
         assertNotNull(rMsg);
         assertEquals("Content was wrong", 
                      "testResubscribeWithChangedSelectorAndRestart1",

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Fri Oct 21 14:42:12 2011
@@ -20,38 +20,28 @@
  */
 package org.apache.qpid.test.unit.topic;
 
+import javax.jms.Connection;
 import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQTopicSessionAdaptor;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
 
 /** @author Apache Software Foundation */
 public class TopicSessionTest extends QpidBrokerTestCase
 {
-
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
-
     public void testTopicSubscriptionUnsubscription() throws Exception
     {
 
@@ -228,83 +218,6 @@ public class TopicSessionTest extends Qp
         con.close();
     }
 
-    public void testSendingSameMessage() throws Exception
-    {
-        AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
-        TemporaryTopic topic = session.createTemporaryTopic();
-        assertNotNull(topic);
-        TopicPublisher producer = session.createPublisher(topic);
-        MessageConsumer consumer = session.createConsumer(topic);
-        conn.start();
-        TextMessage sentMessage = session.createTextMessage("Test Message");
-        producer.send(sentMessage);
-        session.commit();
-        TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
-        assertNotNull(receivedMessage);
-        assertEquals(sentMessage.getText(), receivedMessage.getText());
-        producer.send(sentMessage);
-        session.commit();
-        receivedMessage = (TextMessage) consumer.receive(2000);
-        assertNotNull(receivedMessage);
-        assertEquals(sentMessage.getText(), receivedMessage.getText());
-        session.commit();
-        conn.close();
-
-    }
-
-    public void testTemporaryTopic() throws Exception
-    {
-        AMQConnection conn = (AMQConnection) getConnection("guest", "guest");
-        TopicSession session = conn.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
-        TemporaryTopic topic = session.createTemporaryTopic();
-        assertNotNull(topic);
-        TopicPublisher producer = session.createPublisher(topic);
-        MessageConsumer consumer = session.createConsumer(topic);
-        conn.start();
-        producer.send(session.createTextMessage("hello"));
-        session.commit();
-        TextMessage tm = (TextMessage) consumer.receive(2000);
-        assertNotNull(tm);
-        assertEquals("hello", tm.getText());
-        session.commit();
-        try
-        {
-            topic.delete();
-            fail("Expected JMSException : should not be able to delete while there are active consumers");
-        }
-        catch (JMSException je)
-        {
-            ; //pass
-        }
-
-        consumer.close();
-
-        try
-        {
-            topic.delete();
-        }
-        catch (JMSException je)
-        {
-            fail("Unexpected Exception: " + je.getMessage());
-        }
-
-        TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
-        try
-        {
-            session2.createConsumer(topic);
-            fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
-        }
-        catch (JMSException je)
-        {
-            ; // pass
-        }
-
-
-        conn.close();
-    }
-
-
     public void testNoLocal() throws Exception
     {
 
@@ -398,56 +311,39 @@ public class TopicSessionTest extends Qp
     }
 
     /**
-     * This tests QPID-1191, where messages which are sent to a topic but are not consumed by a subscriber
-     * due to a selector can be leaked.
-     * @throws Exception
+     * This tests was added to demonstrate QPID-3542.  The Java Client when used with the CPP Broker was failing to
+     * ack messages received that did not match the selector.  This meant the messages remained indefinitely on the Broker.
      */
-    public void testNonMatchingMessagesDoNotFillQueue() throws Exception
+    public void testNonMatchingMessagesHandledCorrectly() throws Exception
     {
-        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-
-        // Setup Topic
-        AMQTopic topic = new AMQTopic(con, "testNoLocal");
-
-        TopicSession session = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
+        final String topicName = getName();
+        final String clientId = "clientId" + topicName;
+        final Connection con1 = getConnection();
+        final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Topic topic1 = session1.createTopic(topicName);
+        final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
 
         // Setup subscriber with selector
-        TopicSubscriber selector = session.createSubscriber(topic,  "Selector = 'select'", false);
-        TopicPublisher publisher = session.createPublisher(topic);
+        final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
+        final MessageProducer publisher = session1.createProducer(topic1);
 
-        con.start();
-        TextMessage m;
-        TextMessage message;
+        con1.start();
 
         // Send non-matching message
-        message = session.createTextMessage("non-matching 1");
-        publisher.publish(message);
-        session.commit();
-
-        // Send and consume matching message
-        message = session.createTextMessage("hello");
-        message.setStringProperty("Selector", "select");
-
-        publisher.publish(message);
-        session.commit();
-
-        m = (TextMessage) selector.receive(1000);
-        assertNotNull("should have received message", m);
-        assertEquals("Message contents were wrong", "hello", m.getText());
-
-        // Send non-matching message
-        message = session.createTextMessage("non-matching 2");
-        publisher.publish(message);
-        session.commit();
-
-        // Assert queue count is 0
-        long depth = ((AMQTopicSessionAdaptor) session).getSession().getQueueDepth(topic);
-        assertEquals("Queue depth was wrong", 0, depth);
-
-    }
-
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(TopicSessionTest.class);
+        final Message sentMessage = session1.createTextMessage("hello");
+        sentMessage.setStringProperty("Selector", "nonMatch");
+        publisher.send(sentMessage);
+
+        // Try to consume non-message, expect this to fail.
+        final Message message1 = subscriberWithSelector.receive(1000);
+        assertNull("should not have received message", message1);
+        subscriberWithSelector.close();
+
+        session1.close();
+
+        // Now verify queue depth on broker.
+        final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+        assertEquals("Expected queue depth of zero", 0, depth);
     }
 }

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/FailoverBaseCase.java Fri Oct 21 14:42:12 2011
@@ -34,24 +34,10 @@ public class FailoverBaseCase extends Qp
 {
     protected static final Logger _logger = LoggerFactory.getLogger(FailoverBaseCase.class);
 
-    public static int FAILING_VM_PORT = 2;
-    public static int FAILING_PORT = Integer.parseInt(System.getProperty("test.port.alt"));
     public static final long DEFAULT_FAILOVER_TIME = 10000L;
 
     protected int failingPort;
 
-    protected int getFailingPort()
-    {
-        if (_broker.equals(VM))
-        {
-            return FAILING_VM_PORT;
-        }
-        else
-        {
-        	return FAILING_PORT;
-        }
-    }
-
     protected void setUp() throws java.lang.Exception
     {
         super.setUp();
@@ -82,6 +68,14 @@ public class FailoverBaseCase extends Qp
         return _connectionFactory;
     }
 
+    @Override
+    public void stopBroker(int port) throws Exception
+    {
+        if (isBrokerPresent(port))
+        {
+            super.stopBroker(port);
+        }
+    }
 
     public void tearDown() throws Exception
     {

Modified: qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java (original)
+++ qpid/branches/QPID-2519/java/systests/src/main/java/org/apache/qpid/test/utils/JMXTestUtils.java Fri Oct 21 14:42:12 2011
@@ -21,6 +21,8 @@
 package org.apache.qpid.test.utils;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 import javax.management.JMException;
@@ -31,14 +33,17 @@ import javax.management.ObjectName;
 import javax.management.MalformedObjectNameException;
 import javax.management.remote.JMXConnector;
 
+import junit.framework.TestCase;
+
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.commands.objects.AllObjects;
 import org.apache.qpid.management.common.JMXConnnectionFactory;
 import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedConnection;
 import org.apache.qpid.management.common.mbeans.ManagedExchange;
 import org.apache.qpid.management.common.mbeans.LoggingManagement;
 import org.apache.qpid.management.common.mbeans.ConfigurationManagement;
 import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.management.common.mbeans.ServerInformation;
 import org.apache.qpid.management.common.mbeans.UserManagement;
 
 /**
@@ -231,10 +236,10 @@ public class JMXTestUtils
     public ObjectName getVirtualHostManagerObjectName(String vhostName)
     {
         // Get the name of the test manager
-        AllObjects allObject = new AllObjects(_mbsc);
-        allObject.querystring = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" + vhostName + ",*";
+        String query = "org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost="
+                       + ObjectName.quote(vhostName) + ",*";
 
-        Set<ObjectName> objectNames = allObject.returnObjects();
+        Set<ObjectName> objectNames = queryObjects(query);
 
         _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("Incorrect number test vhosts returned", 1, objectNames.size());
@@ -258,14 +263,14 @@ public class JMXTestUtils
     public ObjectName getQueueObjectName(String virtualHostName, String queue)
     {
         // Get the name of the test manager
-        AllObjects allObject = new AllObjects(_mbsc);
-        allObject.querystring = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + virtualHostName + ",name=" + queue + ",*";
+        String query = "org.apache.qpid:type=VirtualHost.Queue,VirtualHost="
+                       + ObjectName.quote(virtualHostName) + ",name="
+                       + ObjectName.quote(queue) + ",*";
 
-        Set<ObjectName> objectNames = allObject.returnObjects();
+        Set<ObjectName> objectNames = queryObjects(query);
 
         _test.assertNotNull("Null ObjectName Set returned", objectNames);
-        _test.assertEquals("Incorrect number of queues with name '" + allObject.querystring +
-                           "' returned", 1, objectNames.size());
+        _test.assertEquals("Incorrect number of queues with name '" + queue + "' returned", 1, objectNames.size());
 
         // We have verified we have only one value in objectNames so return it
         ObjectName objectName = objectNames.iterator().next();
@@ -286,10 +291,11 @@ public class JMXTestUtils
     public ObjectName getExchangeObjectName(String virtualHostName, String exchange)
     {
         // Get the name of the test manager
-        AllObjects allObject = new AllObjects(_mbsc);
-        allObject.querystring = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" + virtualHostName + ",name=" + exchange + ",*";
+        String query = "org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="
+                       + ObjectName.quote(virtualHostName) + ",name="
+                       + ObjectName.quote(exchange) + ",*";
 
-        Set<ObjectName> objectNames = allObject.returnObjects();
+        Set<ObjectName> objectNames = queryObjects(query);
 
         _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("Incorrect number of exchange with name '" + exchange + "' returned", 1, objectNames.size());
@@ -301,12 +307,9 @@ public class JMXTestUtils
     }
 
     @SuppressWarnings("static-access")
-    public <T> T getManagedObject(Class<T> managedClass, String queryString)
+    public <T> T getManagedObject(Class<T> managedClass, String query)
     {
-        AllObjects allObject = new AllObjects(_mbsc);
-        allObject.querystring = queryString;
-
-        Set<ObjectName> objectNames = allObject.returnObjects();
+        Set<ObjectName> objectNames = queryObjects(query);
 
         _test.assertNotNull("Null ObjectName Set returned", objectNames);
         _test.assertEquals("More than one " + managedClass + " returned", 1, objectNames.size());
@@ -321,6 +324,16 @@ public class JMXTestUtils
         return MBeanServerInvocationHandler.newProxyInstance(_mbsc, objectName, managedClass, false);
     }
 
+    public <T> List<T> getManagedObjectList(Class<T> managedClass, Set<ObjectName> objectNames)
+    {
+        List<T> objects = new ArrayList<T>();
+        for (ObjectName name : objectNames)
+        {
+            objects.add(getManagedObject(managedClass, name));
+        }
+        return objects;
+    }
+
     public ManagedBroker getManagedBroker(String virtualHost)
     {
         return getManagedObject(ManagedBroker.class, getVirtualHostManagerObjectName(virtualHost));
@@ -355,4 +368,68 @@ public class JMXTestUtils
 		ObjectName objectName = new ObjectName("org.apache.qpid:type=UserManagement,name=UserManagement");
         return getManagedObject(UserManagement.class, objectName);
     }
+
+    /**
+     * Retrive {@link ServerInformation} JMX MBean.
+     */
+    public ServerInformation getServerInformation()
+    {
+        // Get the name of the test manager
+        String query = "org.apache.qpid:type=ServerInformation,name=ServerInformation,*";
+
+        Set<ObjectName> objectNames = queryObjects(query);
+
+        TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+        TestCase.assertEquals("Incorrect number of objects returned", 1, objectNames.size());
+
+        // We have verified we have only one value in objectNames so return it
+        return getManagedObject(ServerInformation.class, objectNames.iterator().next());
+    }
+
+    /**
+     * Retrive all {@link ManagedConnection} objects.
+     */
+    public List<ManagedConnection> getAllManagedConnections()
+    {
+        // Get the name of the test manager
+        String query = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=*,name=*";
+
+        Set<ObjectName> objectNames = queryObjects(query);
+
+        TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+        return getManagedObjectList(ManagedConnection.class, objectNames);
+    }
+
+    /**
+     * Retrive all {@link ManagedConnection} objects for a particular virtual host.
+     */
+    public List<ManagedConnection> getManagedConnections(String vhost)
+    {
+        // Get the name of the test manager
+        String query = "org.apache.qpid:type=VirtualHost.Connection,VirtualHost=" + ObjectName.quote(vhost) + ",name=*";
+
+        Set<ObjectName> objectNames = queryObjects(query);
+
+        TestCase.assertNotNull("Null ObjectName Set returned", objectNames);
+
+        return getManagedObjectList(ManagedConnection.class, objectNames);
+    }
+
+    /**
+     * Returns the Set of ObjectNames returned by the broker for the given query,
+     * or null if there is problem while performing the query.
+     */
+    private Set<ObjectName> queryObjects(String query)
+    {
+        try
+        {
+            return _mbsc.queryNames(new ObjectName(query), null);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            return null;
+        }
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org