You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [15/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker...

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Aug 14 20:40:49 2008
@@ -40,7 +40,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.net.Socket;
 
-
 /**
  * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
  * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -85,38 +84,18 @@
             throw new AMQNoTransportForProtocolException(details, null, null);
         }
 
-       /* if (transport == _currentInstance)
-        {
-            if (transport == VM)
-            {
-                if (_currentVMPort == details.getPort())
-                {
-                    return _instance;
-                }
-            }
-            else
-            {
-                return _instance;
-            }
-        }
-
-        _currentInstance = transport;*/
-
-        ITransportConnection instance;
         switch (transport)
         {
             case SOCKET:
-                instance =
-                        new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
-                        {
-                            public IoConnector newSocketConnector()
-                            {
-                                return new ExistingSocketConnector();
-                            }
-                        });
-                break;
+                return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                {
+                    public IoConnector newSocketConnector()
+                    {
+                        return new ExistingSocketConnector();
+                    }
+                });
             case TCP:
-                instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
                 {
                     public IoConnector newSocketConnector()
                     {
@@ -125,8 +104,8 @@
                         if (Boolean.getBoolean("qpidnio"))
                         {
                             _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
-                                                                 ? "Qpid NIO is new default"
-                                                                 : "Sysproperty 'qpidnio' is set"));
+                                                                              ? "Qpid NIO is new default"
+                                                                              : "Sysproperty 'qpidnio' is set"));
                             result = new MultiThreadSocketConnector();
                         }
                         else
@@ -141,18 +120,13 @@
                         return result;
                     }
                 });
-                break;
             case VM:
             {
-                instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
-                break;
+                return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
             }
             default:
-                // FIXME: TGM
-                throw new AMQNoTransportForProtocolException(details, null, null);
+                throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
         }
-
-        return instance;
     }
 
     private static int getTransport(String transport)
@@ -180,13 +154,22 @@
     {
         int port = details.getPort();
 
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            if (AutoCreate)
+            if (!_inVmPipeAddress.containsKey(port))
             {
                 if (AutoCreate)
                 {
-                    createVMBroker(port);
+                    if (AutoCreate)
+                    {
+                        _logger.warn("Auto Creating InVM Broker on port:" + port);
+                        createVMBroker(port);
+                    }
+                    else
+                    {
+                        throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+                                                                           + " does not exist. Auto create disabled.", null);
+                    }
                 }
                 else
                 {
@@ -194,11 +177,6 @@
                                                                        + " does not exist. Auto create disabled.", null);
                 }
             }
-            else
-            {
-                throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
-                                                                   + " does not exist. Auto create disabled.", null);
-            }
         }
 
         return new VmPipeTransportConnection(port);
@@ -214,70 +192,73 @@
 
             config.setThreadModel(ReadWriteThreadModel.getInstance());
         }
-
-        if (!_inVmPipeAddress.containsKey(port))
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
-            IoHandlerAdapter provider = null;
-            try
-            {
-                VmPipeAddress pipe = new VmPipeAddress(port);
-
-                provider = createBrokerInstance(port);
-
-                _acceptor.bind(pipe, provider);
 
-                _inVmPipeAddress.put(port, pipe);
-                _logger.info("Created InVM Qpid.AMQP listening on port " + port);
-            }
-            catch (IOException e)
+            if (!_inVmPipeAddress.containsKey(port))
             {
-                _logger.error("Got IOException.", e);
-
-                // Try and unbind provider
+                _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+                IoHandlerAdapter provider = null;
                 try
                 {
                     VmPipeAddress pipe = new VmPipeAddress(port);
 
-                    try
-                    {
-                        _acceptor.unbind(pipe);
-                    }
-                    catch (Exception ignore)
-                    {
-                        // ignore
-                    }
-
-                    if (provider == null)
-                    {
-                        provider = createBrokerInstance(port);
-                    }
+                    provider = createBrokerInstance(port);
 
                     _acceptor.bind(pipe, provider);
+
                     _inVmPipeAddress.put(port, pipe);
                     _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                 }
-                catch (IOException justUseFirstException)
+                catch (IOException e)
                 {
-                    String because;
-                    if (e.getCause() == null)
+                    _logger.error("Got IOException.", e);
+
+                    // Try and unbind provider
+                    try
                     {
-                        because = e.toString();
+                        VmPipeAddress pipe = new VmPipeAddress(port);
+
+                        try
+                        {
+                            _acceptor.unbind(pipe);
+                        }
+                        catch (Exception ignore)
+                        {
+                            // ignore
+                        }
+
+                        if (provider == null)
+                        {
+                            provider = createBrokerInstance(port);
+                        }
+
+                        _acceptor.bind(pipe, provider);
+                        _inVmPipeAddress.put(port, pipe);
+                        _logger.info("Created InVM Qpid.AMQP listening on port " + port);
                     }
-                    else
+                    catch (IOException justUseFirstException)
                     {
-                        because = e.getCause().toString();
-                    }
+                        String because;
+                        if (e.getCause() == null)
+                        {
+                            because = e.toString();
+                        }
+                        else
+                        {
+                            because = e.getCause().toString();
+                        }
 
-                    throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                        throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+                    }
                 }
+
+            }
+            else
+            {
+                _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
             }
         }
-        else
-        {
-            _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
-        }
-
     }
 
     private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -324,7 +305,7 @@
         _logger.info("Killing all VM Brokers");
         if (_acceptor != null)
         {
-        	_acceptor.unbindAll();
+            _acceptor.unbindAll();
         }
         synchronized (_inVmPipeAddress)
         {
@@ -337,14 +318,17 @@
 
     public static void killVMBroker(int port)
     {
-        VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
-        if (pipe != null)
+        synchronized (_inVmPipeAddress)
         {
-            _logger.info("Killing VM Broker:" + port);
-            _inVmPipeAddress.remove(port);
-            // This does need to be sychronized as otherwise mina can hang
-            // if a new connection is made
-            _acceptor.unbind(pipe);
+            VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+            if (pipe != null)
+            {
+                _logger.info("Killing VM Broker:" + port);
+                _inVmPipeAddress.remove(port);
+                // This does need to be sychronized as otherwise mina can hang
+                // if a new connection is made
+                _acceptor.unbind(pipe);
+            }
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Thu Aug 14 20:40:49 2008
@@ -21,8 +21,10 @@
 package org.apache.qpid.client.util;
 
 import java.util.Iterator;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -35,7 +37,7 @@
 public class FlowControllingBlockingQueue
 {
     /** This queue is bounded and is used to store messages before being dispatched to the consumer */
-    private final BlockingQueue _queue = new LinkedBlockingQueue();
+    private final Queue _queue = new ConcurrentLinkedQueue();
 
     private final int _flowControlHighThreshold;
     private final int _flowControlLowThreshold;
@@ -71,7 +73,17 @@
 
     public Object take() throws InterruptedException
     {
-        Object o = _queue.take();
+        Object o = _queue.poll();
+        if(o == null)
+        {
+            synchronized(this)
+            {
+                while((o = _queue.poll())==null)
+                {
+                    wait();
+                }
+            }
+        }
         if (_listener != null)
         {
             synchronized (_listener)
@@ -88,7 +100,12 @@
 
     public void add(Object o)
     {
-        _queue.add(o);
+        synchronized(this)
+        {
+            _queue.add(o);
+
+            notifyAll();
+        }
         if (_listener != null)
         {
             synchronized (_listener)

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Thu Aug 14 20:40:49 2008
@@ -48,7 +48,7 @@
     public static final long DEFAULT_CONNECT_TIMEOUT = 30000L;
     public static final boolean USE_SSL_DEFAULT = false;
 
-    // pulled these properties from the new BrokerDetails class in the qpidity package
+    // pulled these properties from the new BrokerDetails class in the qpid package
     public static final String PROTOCOL_TCP = "tcp";
     public static final String PROTOCOL_TLS = "tls";
 

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/Message.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/Message.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/Message.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/jms/Message.java Thu Aug 14 20:40:49 2008
@@ -24,5 +24,7 @@
 
 public interface Message extends javax.jms.Message
 {
+    public static final String JMS_TYPE = "x-jms-type";
+
     public void acknowledgeThis() throws JMSException;
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Thu Aug 14 20:40:49 2008
@@ -26,21 +26,21 @@
 {
     public static JMSTextMessage newJMSTextMessage() throws JMSException
     {
-        return new JMSTextMessage();
+        return new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
     }
 
     public static JMSBytesMessage newJMSBytesMessage() throws JMSException
     {
-        return new JMSBytesMessage();
+        return new JMSBytesMessage(AMQMessageDelegateFactory.FACTORY_0_8);
     }
 
     public static JMSMapMessage newJMSMapMessage() throws JMSException
     {
-        return new JMSMapMessage();
+        return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_8);
     }
 
     public static JMSStreamMessage newJMSStreamMessage()
     {
-        return new JMSStreamMessage();
+        return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Thu Aug 14 20:40:49 2008
@@ -27,12 +27,10 @@
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,12 +46,10 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager,  ChannelCloseBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session,  ChannelCloseBody method, int channelId)
         throws AMQException
     {
         _logger.debug("ChannelClose method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-
 
         AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
         AMQShortString reason = method.getReplyText();

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Thu Aug 14 20:40:49 2008
@@ -33,11 +33,12 @@
 
     public void testFailoverURL() throws URLSyntaxException
     {
-        String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin'";
+        String url = "amqp://ritchiem:bob@/test?brokerlist='tcp://localhost:5672;tcp://fancyserver:3000/',failover='roundrobin?cyclecount='100''";
 
         ConnectionURL connectionurl = new AMQConnectionURL(url);
 
         assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
+        assertEquals("100", connectionurl.getFailoverOption(ConnectionURL.OPTIONS_FAILOVER_CYCLE));        
         assertTrue(connectionurl.getUsername().equals("ritchiem"));
         assertTrue(connectionurl.getPassword().equals("bob"));
         assertTrue(connectionurl.getVirtualHost().equals("/test"));
@@ -276,7 +277,7 @@
 
     public void testSingleTransportMultiOptionURL() throws URLSyntaxException
     {
-        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672',routingkey='jim',timeout='200',immediatedelivery='true'";
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
 
         ConnectionURL connectionurl = new AMQConnectionURL(url);
 
@@ -493,8 +494,38 @@
         }
     }
 
+    public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
+
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        assertTrue(connectionurl.getFailoverMethod() == null);
+        assertTrue(connectionurl.getUsername().equals("guest"));
+        assertTrue(connectionurl.getPassword().equals("guest"));
+        assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+        assertTrue(connectionurl.getBrokerCount() == 1);
+
+        BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+        assertTrue(service.getTransport().equals("tcp"));
+
+        
+        assertTrue(service.getHost().equals("localhost"));
+        assertTrue(service.getPort() == 5672);
+        assertEquals("jim",service.getProperty("foo"));
+        assertEquals("bob",service.getProperty("bar"));
+        assertEquals("jimmy",service.getProperty("fred"));
+
+        assertTrue(connectionurl.getOption("routingkey").equals("jim"));
+        assertTrue(connectionurl.getOption("timeout").equals("200"));
+        assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
+    }
+
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(ConnectionURLTest.class);
     }
 }
+

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Thu Aug 14 20:40:49 2008
@@ -23,7 +23,6 @@
 import junit.framework.TestCase;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.test.unit.basic.PropertyValueTest;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Thu Aug 14 20:40:49 2008
@@ -20,20 +20,20 @@
  */
 package org.apache.qpid.test.unit.message;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.TextMessage;
+import javax.jms.*;
 
 import junit.framework.TestCase;
 
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.JMSMapMessage;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.*;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import java.util.Map;
 
 
 public class MessageConverterTest extends TestCase
@@ -47,36 +47,38 @@
     protected JMSTextMessage testTextMessage;
 
     protected JMSMapMessage testMapMessage;
+    private AMQSession _session = new TestAMQSession();
+
 
     protected void setUp() throws Exception
     {
         super.setUp();
-        testTextMessage = new JMSTextMessage();
+        testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
 
         //Set Message Text
         testTextMessage.setText("testTextMessage text");
         setMessageProperties(testTextMessage);
 
-        testMapMessage = new JMSMapMessage();
+        testMapMessage = new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_8);
         testMapMessage.setString("testMapString", "testMapStringValue");
         testMapMessage.setDouble("testMapDouble", Double.MAX_VALUE);
     }
 
     public void testSetProperties() throws Exception
     {
-        AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+        AbstractJMSMessage newMessage = new MessageConverter(_session, (TextMessage) testTextMessage).getConvertedMessage();
         mesagePropertiesTest(testTextMessage, newMessage);
     }
 
     public void testJMSTextMessageConversion() throws Exception
     {
-        AbstractJMSMessage newMessage = new MessageConverter((TextMessage) testTextMessage).getConvertedMessage();
+        AbstractJMSMessage newMessage = new MessageConverter(_session, (TextMessage) testTextMessage).getConvertedMessage();
         assertEquals("Converted message text mismatch", ((JMSTextMessage) newMessage).getText(), testTextMessage.getText());
     }
 
     public void testJMSMapMessageConversion() throws Exception
     {
-        AbstractJMSMessage newMessage = new MessageConverter((MapMessage) testMapMessage).getConvertedMessage();
+        AbstractJMSMessage newMessage = new MessageConverter(_session, (MapMessage) testMapMessage).getConvertedMessage();
         assertEquals("Converted map message String mismatch", ((JMSMapMessage) newMessage).getString("testMapString"),
                      testMapMessage.getString("testMapString"));
         assertEquals("Converted map message Double mismatch", ((JMSMapMessage) newMessage).getDouble("testMapDouble"),

Modified: incubator/qpid/branches/qpid.0-10/java/common.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common.xml?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common.xml (original)
+++ incubator/qpid/branches/qpid.0-10/java/common.xml Thu Aug 14 20:40:49 2008
@@ -47,6 +47,8 @@
   <property name="tasks.classes"         location="${tasks}/classes"/>
   <property name="tasks.src"             location="${tasks}/src"/>
 
+  <property name="javac.compiler.args"   value=""/>
+
   <macrodef name="indirect">
     <attribute name="name"/>
     <attribute name="variable"/>
@@ -75,8 +77,9 @@
   </macrodef>
 
   <mkdir dir="${tasks.classes}"/>
-  <javac srcdir="${tasks.src}" destdir="${tasks.classes}"
-         classpath="${java.class.path}"/>
+  <javac source="${java.source}" target="${java.target}" srcdir="${tasks.src}" destdir="${tasks.classes}"  classpath="${java.class.path}">
+    <compilerarg line="${javac.compiler.args}"/>
+  </javac>
 
   <taskdef name="map" classname="org.apache.qpid.tasks.Map"
            classpath="${tasks.classes}"/>
@@ -118,7 +121,7 @@
 
       ${build.bin}
 
-  ant test [ -Dtest=&lt;pattern&gt; ] [ report ]
+  ant test [ -Dtest=&lt;pattern&gt; ] [-Dprofile=&lt;profile&gt;] [ report ]
 
     Execute unit tests and place the output in the build results
     directory:
@@ -149,7 +152,9 @@
     default to running all available tests for the project or module
     depending on the current working directory.
 
-    Finally it can be useful to append the report target in order to
+    Test Reports
+
+    It can be useful to append the report target in order to
     generate an html summary of the tests that were just run. The
     following command will run both the MongooseTest and GooseTest
     test cases and generate an html summary of the results:
@@ -159,6 +164,16 @@
     See the documentation for the "ant report" target for more details
     on the generated report.
 
+    Test Profiles
+
+    There are a number of profiles defined for running the test suite.
+    These test profiles define how the test should be run. If the test
+    profile is not specified then 'default.testprofile' is utilised.
+    This runs the system tests against the Java InVM broker. Additional
+    test profiles exists as follows:
+
+        cpp : Runs against the built cpp tree broker.
+
   ant report
 
     The report target will generate an html summary of the current

Modified: incubator/qpid/branches/qpid.0-10/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/Composite.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/Composite.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/Composite.tpl Thu Aug 14 20:40:49 2008
@@ -1,28 +1,30 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.Encodable;
-import org.apache.qpidity.transport.codec.Encoder;
-import org.apache.qpidity.transport.codec.Validator;
+import org.apache.qpid.transport.codec.Decoder;
+import org.apache.qpid.transport.codec.Encodable;
+import org.apache.qpid.transport.codec.Encoder;
 
-import org.apache.qpidity.transport.network.Frame;
+import org.apache.qpid.transport.network.Frame;
 
 ${
 from genutil import *
 
 cls = klass(type)["@name"]
 
+segments = type["segments"]
+
 if type.name in ("control", "command"):
   base = "Method"
   size = 0
   pack = 2
-  if type["segments"]:
+  if segments:
     payload = "true"
   else:
     payload = "false"
@@ -80,12 +82,16 @@
   out("    private $(PACK_TYPES[pack]) packing_flags = 0;\n");
 
 fields = get_fields(type)
-params = get_parameters(fields)
+params = get_parameters(type, fields)
 options = get_options(fields)
 
 for f in fields:
   if not f.empty:
     out("    private $(f.type) $(f.name);\n")
+
+if segments:
+  out("    private Header header;\n")
+  out("    private ByteBuffer body;\n")
 }
 
 ${
@@ -99,7 +105,11 @@
   if f.option: continue
   out("        $(f.set)($(f.name));\n")
 
-if options:
+if segments:
+  out("        setHeader(header);\n")
+  out("        setBody(body);\n")
+
+if options or base == "Method":
   out("""
         for (int i=0; i < _options.length; i++) {
             switch (_options[i]) {
@@ -108,7 +118,11 @@
   for f in options:
     out("            case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n")
 
-  out("""            case NO_OPTION: break;
+  if base == "Method":
+    out("""            case SYNC: this.setSync(true); break;
+            case BATCH: this.setBatch(true); break;
+""")
+  out("""            case NONE: break;
             default: throw new IllegalArgumentException("invalid option: " + _options[i]);
             }
         }
@@ -150,7 +164,6 @@
     }
 
     public final $name $(f.set)($(f.type) value) {
-        $(f.check)
 ${
 if not f.empty:
   out("        this.$(f.name) = value;")
@@ -169,6 +182,44 @@
 """)
 }
 
+${
+if segments:
+    out("""    public final Header getHeader() {
+        return this.header;
+    }
+
+    public final void setHeader(Header header) {
+        this.header = header;
+    }
+
+    public final $name header(Header header) {
+        setHeader(header);
+        return this;
+    }
+
+    public final ByteBuffer getBody() {
+        if (this.body == null)
+        {
+            return null;
+        }
+        else
+        {
+            return this.body.slice();
+        }
+    }
+
+    public final void setBody(ByteBuffer body) {
+        this.body = body;
+    }
+
+    public final $name body(ByteBuffer body)
+    {
+        setBody(body);
+        return this;
+    }
+""")
+}
+
     public void write(Encoder enc)
     {
 ${

Modified: incubator/qpid/branches/qpid.0-10/java/common/Constant.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/Constant.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/Constant.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/Constant.tpl Thu Aug 14 20:40:49 2008
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
 ${from genutil import *}
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/Enum.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/Enum.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/Enum.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/Enum.tpl Thu Aug 14 20:40:49 2008
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
 public enum $name {
 ${

Modified: incubator/qpid/branches/qpid.0-10/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/Invoker.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/Invoker.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/Invoker.tpl Thu Aug 14 20:40:49 2008
@@ -1,5 +1,6 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -15,8 +16,8 @@
 for c in composites:
   name = cname(c)
   fields = get_fields(c)
-  params = get_parameters(fields)
-  args = get_arguments(fields)
+  params = get_parameters(c, fields)
+  args = get_arguments(c, fields)
   result = c["result"]
   if result:
     if not result["@type"]:
@@ -32,9 +33,9 @@
     jclass = ""
 
   out("""
-     public $jresult $(dromedary(name))($(", ".join(params))) {
-         $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
-     }
+    public final $jresult $(dromedary(name))($(", ".join(params))) {
+        $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+    }
 """)
 }
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/MethodDelegate.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/MethodDelegate.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/MethodDelegate.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/MethodDelegate.tpl Thu Aug 14 20:40:49 2008
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
 public abstract class MethodDelegate<C> {
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/Option.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/Option.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/Option.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/Option.tpl Thu Aug 14 20:40:49 2008
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
 public enum Option {
 
@@ -15,5 +15,6 @@
       if not options.has_key(option):
         options[option] = None
         out("    $option,\n")}
-    NO_OPTION
+    BATCH,
+    NONE
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/StructFactory.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/StructFactory.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/StructFactory.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/StructFactory.tpl Thu Aug 14 20:40:49 2008
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
 class StructFactory {
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/Type.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/Type.tpl?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/Type.tpl (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/Type.tpl Thu Aug 14 20:40:49 2008
@@ -1,4 +1,4 @@
-package org.apache.qpidity.transport;
+package org.apache.qpid.transport;
 
 ${from genutil import *}
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/bin/qpid-run?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/bin/qpid-run (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/bin/qpid-run Thu Aug 14 20:40:49 2008
@@ -37,19 +37,39 @@
   exit 1
 }
 
+OFF=0
+WARN=1
+INFO=2
+
+if [ -z "$QPID_RUN_LOG" ]; then
+    QPID_RUN_LOG=$OFF
+fi
+
+log() {
+  if [ "$1" -le "$QPID_RUN_LOG" ]; then
+    shift
+    echo "$@"
+  fi
+}
+
 if [ -z $AMQJ_LOGGING_LEVEL ]; then
     export AMQJ_LOGGING_LEVEL=info
 fi
 
 if [ -z "$QPID_HOME" ]; then
-  die "QPID_HOME must be set"
+    export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+    export PATH=${PATH}:${QPID_HOME}/bin
 fi
 
 if [ -z "$QPID_WORK" ]; then
-    echo Setting QPID_WORK to $HOME as default
+    log $INFO Setting QPID_WORK to $HOME as default
     QPID_WORK=$HOME
 fi
 
+if [ -z "$JAVA" ]; then
+    JAVA=java
+fi
+
 if $cygwin; then
   QPID_HOME=$(cygpath -w $QPID_HOME)
   QPID_WORK=$(cygpath -w $QPID_WORK)
@@ -64,10 +84,10 @@
 #Using X character to avoid probs with empty strings
 if [ -n "$QPID_LOG_PREFIX" ]; then
     if [ "X$QPID_LOG_PREFIX" = "XPID" ]; then
-        echo Using pid in qpid log name prefix
+        log $INFO Using pid in qpid log name prefix
         LOG_PREFIX=" -Dlogprefix=$$"
     else
-        echo Using qpid logprefix property
+        log $INFO Using qpid logprefix property
         LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
     fi
     SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_PREFIX}"
@@ -75,16 +95,16 @@
 
 if [ -n "$QPID_LOG_SUFFIX" ]; then
     if [ "X$QPID_LOG_SUFFIX" = "XPID" ]; then
-        echo Using pid in qpid log name suffix
+        log $INFO Using pid in qpid log name suffix
         LOG_SUFFIX=" -Dlogsuffix=$$"
     else
-        echo Using qpig logsuffix property
+        log $INFO Using qpig logsuffix property
         LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
     fi
     SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_SUFFIX}"
 fi
 
-echo System Properties set to $SYSTEM_PROPS
+log $INFO System Properties set to $SYSTEM_PROPS
 
 program=$(basename $0)
 sourced=${BASH_SOURCE[0]}
@@ -109,26 +129,26 @@
 #Use QPID_CLASSPATH if set
 if [ -n "$QPID_CLASSPATH" ]; then
     export CLASSPATH=$QPID_CLASSPATH
-    echo "Using QPID_CLASSPATH" $QPID_CLASSPATH
+    log $INFO "Using QPID_CLASSPATH" $QPID_CLASSPATH
 else
-    echo "Warning: Qpid classpath not set. CLASSPATH must include qpid jars."
+    log $WARN "Warning: Qpid classpath not set. CLASSPATH must include qpid jars."
 fi
 
 #Use QPID_JAVA_GC if set
 if [ -n "$QPID_JAVA_GC" ]; then
     export JAVA_GC=$QPID_JAVA_GC
-    echo "Using QPID_JAVA_GC setting" $QPID_JAVA_GC
+    log $INFO "Using QPID_JAVA_GC setting" $QPID_JAVA_GC
 else
-    echo "Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC" $JAVA_GC
+    log $INFO "Info: QPID_JAVA_GC not set. Defaulting to JAVA_GC" $JAVA_GC
 fi
 
 
 #Use QPID_JAVA_MEM if set
 if [ -n "$QPID_JAVA_MEM" ]; then
     export JAVA_MEM=$QPID_JAVA_MEM
-    echo "Using QPID_JAVA_MEM setting" $QPID_JAVA_MEM
+    log $INFO "Using QPID_JAVA_MEM setting" $QPID_JAVA_MEM
 else
-    echo "Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM" $JAVA_MEM
+    log $INFO "Info: QPID_JAVA_MEM not set. Defaulting to JAVA_MEM" $JAVA_MEM
 fi
 
 declare -a RUN_ARGS JAVA_ARGS
@@ -172,7 +192,7 @@
       ;;
     -run:jpda)
 #USAGE: adds debugging options to the java command, use
-#USAGE: JDPA_TRANSPORT and JPDA_ADDRESS to customize the debugging
+#USAGE: JPDA_TRANSPORT and JPDA_ADDRESS to customize the debugging
 #USAGE: behavior and use JPDA_OPTS to override it entirely
       if [ -z "$JPDA_OPTS" ]; then
         JPDA_OPTS="-Xdebug -Xrunjdwp:transport=${JPDA_TRANSPORT:-dt_socket},address=${JPDA_ADDRESS:-8000},server=y,suspend=n"

Modified: incubator/qpid/branches/qpid.0-10/java/common/codegen
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/codegen?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/codegen (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/codegen Thu Aug 14 20:40:49 2008
@@ -7,7 +7,7 @@
 out_dir = sys.argv[1]
 spec_file = sys.argv[2]
 tpl_dir = sys.argv[3]
-pkg_dir = os.path.join(out_dir, "org/apache/qpidity/transport")
+pkg_dir = os.path.join(out_dir, "org/apache/qpid/transport")
 
 if not os.path.exists(pkg_dir):
   os.makedirs(pkg_dir)

Modified: incubator/qpid/branches/qpid.0-10/java/common/genutil.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/genutil.py?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/genutil.py (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/genutil.py Thu Aug 14 20:40:49 2008
@@ -170,18 +170,15 @@
     if self.type_node.name == "struct":
       self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
       self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
-      self.check = ""
       self.coder = "Struct"
     elif self.type_node.name == "domain":
       self.coder = camel(0, self.prim_type["@name"])
       self.read = "%s.get(dec.read%s())" % (tname, self.coder)
       self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name)
-      self.check = ""
     else:
       self.coder = camel(0, self.type_node["@name"])
       self.read = "dec.read%s()" % self.coder
       self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name)
-      self.check = "Validator.check%s(value);" % self.coder
     self.type = jtype(self.type_node)
     self.default = DEFAULTS.get(self.type, "null")
     self.has = camel(1, "has", self.name)
@@ -206,7 +203,7 @@
     index += 1
   return fields
 
-def get_parameters(fields):
+def get_parameters(type, fields):
   params = []
   options = False
   for f in fields:
@@ -214,11 +211,14 @@
       options = True
     else:
       params.append("%s %s" % (f.type, f.name))
-  if options:
+  if type["segments"]:
+    params.append("Header header")
+    params.append("ByteBuffer body")
+  if options or type.name in ("control", "command"):
     params.append("Option ... _options")
   return params
 
-def get_arguments(fields):
+def get_arguments(type, fields):
   args = []
   options = False
   for f in fields:
@@ -226,7 +226,10 @@
       options = True
     else:
       args.append(f.name)
-  if options:
+  if type["segments"]:
+    args.append("header")
+    args.append("body")
+  if options or type.name in ("control", "command"):
     args.append("_options")
   return args
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/pom.xml?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/pom.xml Thu Aug 14 20:40:49 2008
@@ -61,7 +61,7 @@
 <!--                                <exec executable="python">
                                   <arg line="generate"/>
                                   <arg line="${generated.path}"/>
-                                  <arg line="org.apache.qpidity"/>
+                                  <arg line="org.apache.qpid"/>
                                   <arg line="${specs.dir}/amqp-transitional.0-10.xml"/>
                                 </exec> -->
                             </tasks>

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/log4j.properties?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/log4j.properties (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/log4j.properties Thu Aug 14 20:40:49 2008
@@ -19,9 +19,12 @@
 log4j.rootLogger=${root.logging.level}
 
 
+log4j.logger.qpid.protocol=${amqj.protocol.logging.level}, console
+log4j.additivity.qpid.protocol=false
 log4j.logger.org.apache.qpid=${amqj.logging.level}, console
 log4j.additivity.org.apache.qpid=false
 
+
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.Threshold=all
 log4j.appender.console.layout=org.apache.log4j.PatternLayout

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/FixedSizeByteBufferAllocator.java Thu Aug 14 20:40:49 2008
@@ -62,7 +62,6 @@
         private static final class FixedSizeByteBuffer extends ByteBuffer
         {
             private java.nio.ByteBuffer buf;
-            private int refCount = 1;
             private int mark = -1;
 
 
@@ -70,36 +69,14 @@
             {
                 this.buf = buf;
                 buf.order( ByteOrder.BIG_ENDIAN );
-                refCount = 1;
             }
 
             public synchronized void acquire()
             {
-                if( refCount <= 0 )
-                {
-                    throw new IllegalStateException( "Already released buffer." );
-                }
-
-                refCount ++;
             }
 
             public void release()
             {
-                synchronized( this )
-                {
-                    if( refCount <= 0 )
-                    {
-                        refCount = 0;
-                        throw new IllegalStateException(
-                                "Already released buffer.  You released the buffer too many times." );
-                    }
-
-                    refCount --;
-                    if( refCount > 0)
-                    {
-                        return;
-                    }
-                }
             }
 
             public java.nio.ByteBuffer buf()
@@ -157,50 +134,12 @@
                 {
                     if( newCapacity > capacity() )
                     {
-                        // Allocate a new buffer and transfer all settings to it.
-                        int pos = position();
-                        int limit = limit();
-                        ByteOrder bo = order();
-
-                        capacity0( newCapacity );
-                        buf.limit( limit );
-                        if( mark >= 0 )
-                        {
-                            buf.position( mark );
-                            buf.mark();
-                        }
-                        buf.position( pos );
-                        buf.order( bo );
+                        throw new IllegalArgumentException();
                     }
 
                     return this;
                 }
 
-            protected void capacity0( int requestedCapacity )
-            {
-                int newCapacity = MINIMUM_CAPACITY;
-                while( newCapacity < requestedCapacity )
-                {
-                    newCapacity <<= 1;
-                }
-
-                java.nio.ByteBuffer oldBuf = this.buf;
-                java.nio.ByteBuffer newBuf;
-                if( isDirect() )
-                {
-                    newBuf = java.nio.ByteBuffer.allocateDirect( newCapacity );
-                }
-                else
-                {
-                    newBuf = java.nio.ByteBuffer.allocate( newCapacity );
-                }
-
-                newBuf.clear();
-                oldBuf.clear();
-                newBuf.put( oldBuf );
-                this.buf = newBuf;
-            }
-
 
 
             public boolean isAutoExpand()

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/mina/common/support/DefaultIoFuture.java Thu Aug 14 20:40:49 2008
@@ -31,7 +31,6 @@
  * A default implementation of {@link org.apache.mina.common.IoFuture}.
  *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
- * @version $Rev: 440259 $, $Date: 2006-09-05 14:01:47 +0900 (화, 05  9월 2006) $
  */
 public class DefaultIoFuture implements IoFuture
 {

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Thu Aug 14 20:40:49 2008
@@ -54,6 +54,6 @@
     public AMQFrame getCloseFrame(int channel)
     {
         MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
-        return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
+        return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java Thu Aug 14 20:40:49 2008
@@ -21,6 +21,10 @@
 
 package org.apache.qpid;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
 import org.apache.qpid.protocol.AMQConstant;
 
 /**
@@ -35,6 +39,8 @@
  */
 public class AMQConnectionFailureException extends AMQException
 {
+    Collection<Exception> _exceptions;
+    
 	public AMQConnectionFailureException(String message, Throwable cause)
 	{
 		super(null, message, cause);
@@ -44,4 +50,16 @@
     {
         super(errorCode, message, cause);
     }
+
+    public AMQConnectionFailureException(String message, Collection<Exception> exceptions)
+    {
+        // Blah, I hate ? but java won't let super() be anything other than the first thing, sorry...
+        super (null, message, exceptions.isEmpty() ? null : exceptions.iterator().next());
+        this._exceptions = exceptions;
+    }
+    
+    public Collection<Exception> getLinkedExceptions()
+    {
+        return _exceptions;
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/common/ClientProperties.java Thu Aug 14 20:40:49 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.common;
 
+import org.apache.qpid.framing.AMQShortString;
+
 /**
  * Specifies the available client property types that different clients can use to identify themselves with.
  *
@@ -30,8 +32,21 @@
  */
 public enum ClientProperties
 {
-    instance,
-    product,
-    version,
-    platform
+    instance("instance"),
+    product("product"),
+    version("version"),
+    platform("platform");
+
+    private final AMQShortString _amqShortString;
+
+    private ClientProperties(String name)
+    {
+        _amqShortString = new AMQShortString(name);
+    }
+
+
+    public AMQShortString toAMQShortString()
+    {
+        return _amqShortString;
+    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Thu Aug 14 20:40:49 2008
@@ -50,4 +50,14 @@
         return buffer;
     }
 
+    public java.nio.ByteBuffer toNioByteBuffer()
+    {
+        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+        ByteBuffer buf = ByteBuffer.wrap(buffer);
+        writePayload(buf);    
+        buffer.flip();
+        return buffer;
+    }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Aug 14 20:40:49 2008
@@ -111,6 +111,8 @@
     private final byte[] _data;
     private final int _offset;
     private int _hashCode;
+    private String _asString = null;
+
     private final int _length;
     private static final char[] EMPTY_CHAR_ARRAY = new char[0];
     
@@ -137,7 +139,7 @@
     public AMQShortString(String data)
     {
         this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-
+        _asString = data;
     }
 
     public AMQShortString(char[] data)
@@ -224,7 +226,6 @@
         }
     }
 
-
     /**
      * Get the length of the short string
      * @return length of the underlying byte array
@@ -419,9 +420,14 @@
         return chars;
     }
 
+
     public String asString()
     {
-        return new String(asChars());
+        if (_asString == null)
+        {
+            _asString = new String(asChars());
+        }
+        return _asString;
     }
 
     public boolean equals(Object o)
@@ -464,13 +470,49 @@
             return false;
         }
 
-        if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+        final int hashCode = _hashCode;
+
+        final int otherHashCode = otherString._hashCode;
+
+        if ((hashCode != 0) && (otherHashCode != 0) && (hashCode != otherHashCode))
+        {
+            return false;
+        }
+
+        final int length = _length;
+
+        if(length != otherString._length)
         {
             return false;
         }
 
-        return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
-                || Arrays.equals(getBytes(),otherString.getBytes());
+
+        final byte[] data = _data;
+
+        final byte[] otherData = otherString._data;
+
+        final int offset = _offset;
+
+        final int otherOffset = otherString._offset;
+
+        if(offset == 0 && otherOffset == 0 && length == data.length && length == otherData.length)
+        {
+            return Arrays.equals(data, otherData);
+        }
+        else
+        {
+            int thisIdx = offset;
+            int otherIdx = otherOffset;
+            for(int i = length;  i-- != 0; )
+            {
+                if(!(data[thisIdx++] == otherData[otherIdx++]))
+                {
+                    return false;
+                }
+            }
+        }
+
+        return true;
 
     }
 
@@ -718,4 +760,17 @@
         return false;  //To change body of created methods use File | Settings | File Templates.
     }
 
+
+    public static void main(String args[])
+    {
+        AMQShortString s = new AMQShortString("a.b.c.d.e.f.g.h.i.j.k");
+        AMQShortString s2 = s.substring(2, 7);
+
+        AMQShortStringTokenizer t = s2.tokenize((byte) '.');
+        while(t.hasMoreTokens())
+        {
+            System.err.println(t.nextToken());
+        }
+    }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Thu Aug 14 20:40:49 2008
@@ -93,4 +93,24 @@
     {
         return "[" + getType() + ": " + getValue() + "]";
     }
+
+
+    public boolean equals(Object o)
+    {
+        if(o instanceof AMQTypedValue)
+        {
+            AMQTypedValue other = (AMQTypedValue) o;
+            return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    public int hashCode()
+    {
+        return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+    }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Thu Aug 14 20:40:49 2008
@@ -74,7 +74,7 @@
         buffer.skip((int) length);
     }
 
-    private AMQTypedValue getProperty(AMQShortString string)
+    public AMQTypedValue getProperty(AMQShortString string)
     {
         checkPropertyName(string);
 
@@ -891,6 +891,20 @@
         return keys;
     }
 
+    public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
+    {
+        if(_encodedForm != null)
+        {
+            return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
+        }
+        else
+        {
+            initMapIfNecessary();
+            return _properties.entrySet().iterator();
+        }
+    }
+
+
     public Object get(AMQShortString key)
     {
 
@@ -1050,6 +1064,95 @@
         }
     }
 
+    private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
+    {
+        private final AMQTypedValue _value;
+        private final AMQShortString _key;
+
+        public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
+        {
+            _key = key;
+            _value = value;
+        }
+
+        public AMQShortString getKey()
+        {
+            return _key;
+        }
+
+        public AMQTypedValue getValue()
+        {
+            return _value;
+        }
+
+        public AMQTypedValue setValue(final AMQTypedValue value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean equals(Object o)
+        {
+            if(o instanceof FieldTableEntry)
+            {
+                FieldTableEntry other = (FieldTableEntry) o;
+                return (_key == null ? other._key == null : _key.equals(other._key))
+                       && (_value == null ? other._value == null : _value.equals(other._value));
+            }
+            else
+            {
+                return false;
+            }
+        }
+
+        public int hashCode()
+        {
+            return (getKey()==null   ? 0 : getKey().hashCode())
+                   ^ (getValue()==null ? 0 : getValue().hashCode());
+        }
+
+    }
+
+
+    private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
+    {
+
+        private final ByteBuffer _buffer;
+        private int _expectedRemaining;
+
+        public FieldTableIterator(ByteBuffer buffer, int length)
+        {
+            _buffer = buffer;
+            _expectedRemaining = buffer.remaining() - length;
+        }
+
+        public boolean hasNext()
+        {
+            return (_buffer.remaining() > _expectedRemaining);
+        }
+
+        public Map.Entry<AMQShortString, AMQTypedValue> next()
+        {
+            if(hasNext())
+            {
+                final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
+                AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
+                return new FieldTableEntry(key, value);
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+
+
+
     public int hashCode()
     {
         initMapIfNecessary();

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/Job.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/Job.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/Job.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/Job.java Thu Aug 14 20:40:49 2008
@@ -50,7 +50,7 @@
  *
  * @todo For better re-usability could make the completion handler optional. Only run it when one is set.
  */
-public class Job implements Runnable
+public class Job implements ReadWriteRunnable
 {
     /** The maximum number of events to process per run of the job. More events than this may be queued in the job. */
     private final int _maxEvents;
@@ -67,18 +67,22 @@
     /** Holds the completion continuation, called upon completion of a run of the job. */
     private final JobCompletionHandler _completionHandler;
 
+    private final boolean _readJob;
+
     /**
      * Creates a new job that aggregates many continuations together.
      *
      * @param session           The Mina session.
      * @param completionHandler The per job run, terminal continuation.
      * @param maxEvents         The maximum number of aggregated continuations to process per run of the job.
+     * @param readJob
      */
-    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents)
+    Job(IoSession session, JobCompletionHandler completionHandler, int maxEvents, final boolean readJob)
     {
         _session = session;
         _completionHandler = completionHandler;
         _maxEvents = maxEvents;
+        _readJob = readJob;
     }
 
     /**
@@ -157,6 +161,22 @@
         }
     }
 
+    public boolean isReadJob()
+    {
+        return _readJob;
+    }
+
+    public boolean isRead()
+    {
+        return _readJob;
+    }
+
+    public boolean isWrite()
+    {
+        return !_readJob;
+    }
+
+
     /**
      * Another interface for a continuation.
      *

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/PoolingFilter.java Thu Aug 14 20:40:49 2008
@@ -60,24 +60,6 @@
  *     <td> {@link Job}, {@link Job.JobCompletionHandler}
  * </table>
  *
- * @todo This seems a bit bizarre. ReadWriteThreadModel creates seperate pooling filters for read and write events.
- *       The pooling filters themselves batch read and write events into jobs, but hand these jobs to a common thread
- *       pool for execution. So the same thread pool ends up handling read and write events, albeit with many threads
- *       so there is concurrency. But why go to the trouble of seperating out the read and write events in that case?
- *       Why not just batch them into jobs together? Perhaps its so that seperate thread pools could be used for these
- *       stages.
- *
- * @todo Why set an event limit of 10 on the Job? This also seems bizarre, as the job can have more than 10 events in
- *       it. Its just that it runs them 10 at a time, but the completion hander here checks if there are more to run
- *       and trips off another batch of 10 until they are all done. Why not just have a straight forward
- *       consumer/producer queue scenario without the batches of 10? So instead of having many jobs with batches of 10
- *       in them, just have one queue of events and worker threads taking the next event. There will be coordination
- *       between worker threads and new events arriving on the job anyway, so the simpler scenario may have the same
- *       amount of contention. I can see that the batches of 10 is done, so that no job is allowed to hog the worker
- *       pool for too long. I'm not convinced this fairly complex scheme will actually add anything, and it might be
- *       better to encapsulate it under a Queue interface anyway, so that different queue implementations can easily
- *       be substituted in.
- *
  * @todo The static helper methods are pointless. Could just call new.
  */
 public abstract class PoolingFilter extends IoFilterAdapter implements Job.JobCompletionHandler
@@ -96,17 +78,20 @@
 
     private final int _maxEvents;
 
+    private final boolean _readFilter;
+
     /**
      * Creates a named pooling filter, on the specified shared thread pool.
      *
      * @param refCountingPool The thread pool reference.
      * @param name            The identifying name of the filter type.
      */
-    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents)
+    public PoolingFilter(ReferenceCountingExecutorService refCountingPool, String name, int maxEvents, boolean readFilter)
     {
         _poolReference = refCountingPool;
         _name = name;
         _maxEvents = maxEvents;
+        _readFilter = readFilter;
     }
 
     /**
@@ -167,7 +152,6 @@
     void fireAsynchEvent(Job job, Event event)
     {
 
-        // job.acquire(); //prevents this job being removed from _jobs
         job.add(event);
 
         final ExecutorService pool = _poolReference.getPool();
@@ -201,7 +185,7 @@
      */
     public void createNewJobForSession(IoSession session)
     {
-        Job job = new Job(session, this, MAX_JOB_EVENTS);
+        Job job = new Job(session, this, MAX_JOB_EVENTS,_readFilter);
         session.setAttribute(_name, job);
     }
 
@@ -433,7 +417,7 @@
          */
         public AsynchReadPoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS));
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_read_events", MAX_JOB_EVENTS),true);
         }
 
         /**
@@ -476,7 +460,7 @@
          */
         public AsynchWritePoolingFilter(ReferenceCountingExecutorService refCountingPool, String name)
         {
-            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS));
+            super(refCountingPool, name, Integer.getInteger("amqj.server.read_write_pool.max_write_events", MAX_JOB_EVENTS),false);
         }
 
         /**

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/pool/ReferenceCountingExecutorService.java Thu Aug 14 20:40:49 2008
@@ -22,6 +22,9 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * ReferenceCountingExecutorService wraps an ExecutorService in order to provide shared reference to it. It counts
@@ -84,6 +87,8 @@
     /** Holds the number of executor threads to create. */
     private int _poolSize = Integer.getInteger("amqj.read_write_pool_size", DEFAULT_POOL_SIZE);
 
+    private final boolean _useBiasedPool = Boolean.getBoolean("org.apache.qpid.use_write_biased_pool");
+
     /**
      * Retrieves the singleton instance of this reference counter.
      *
@@ -105,15 +110,28 @@
      *
      * @return An executor service.
      */
-    ExecutorService acquireExecutorService()
+    public ExecutorService acquireExecutorService()
     {
         synchronized (_lock)
         {
             if (_refCount++ == 0)
             {
-                _pool = Executors.newFixedThreadPool(_poolSize);
+//                _pool = Executors.newFixedThreadPool(_poolSize);
+
+                // Use a job queue that biases to writes
+                if(_useBiasedPool)
+                {
+                    _pool =  new ThreadPoolExecutor(_poolSize, _poolSize,
+                                          0L, TimeUnit.MILLISECONDS,
+                                          new ReadWriteJobQueue());
+                }
+                else
+                {
+                    _pool = Executors.newFixedThreadPool(_poolSize);
+                }
             }
 
+
             return _pool;
         }
     }
@@ -122,7 +140,7 @@
      * Releases a reference to a shared executor service, decrementing the reference count. If the refence count falls
      * to zero, the executor service is shut down.
      */
-    void releaseExecutorService()
+    public void releaseExecutorService()
     {
         synchronized (_lock)
         {

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Thu Aug 14 20:40:49 2008
@@ -21,8 +21,12 @@
 package org.apache.qpid.protocol;
 
 import org.apache.qpid.framing.*;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.AMQException;
 
+import java.nio.ByteBuffer;
+
+
 /**
  * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, that need to provide an awareness to
  * callers of the version of the AMQP protocol that they are able to work with.
@@ -54,4 +58,7 @@
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException;
 
 
+    public void setSender(Sender<ByteBuffer> sender);
+    public void init();
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/BindingURLImpl.java Thu Aug 14 20:40:49 2008
@@ -17,7 +17,7 @@
  */
 package org.apache.qpid.url;
 
-import org.apache.qpidity.exchange.ExchangeDefaults;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURL.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURL.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURL.java Thu Aug 14 20:40:49 2008
@@ -17,7 +17,7 @@
  */
 package org.apache.qpid.url;
 
-import org.apache.qpidity.BrokerDetails;
+import org.apache.qpid.BrokerDetails;
 
 import java.util.List;
 

Modified: incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/common/src/main/java/org/apache/qpid/url/QpidURLImpl.java Thu Aug 14 20:40:49 2008
@@ -17,8 +17,8 @@
  */
 package org.apache.qpid.url;
 
-import org.apache.qpidity.BrokerDetails;
-import org.apache.qpidity.BrokerDetailsImpl;
+import org.apache.qpid.BrokerDetails;
+import org.apache.qpid.BrokerDetailsImpl;
 
 import java.net.MalformedURLException;
 import java.util.ArrayList;

Modified: incubator/qpid/branches/qpid.0-10/java/cpp.async.testprofile
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/cpp.async.testprofile?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/cpp.async.testprofile (original)
+++ incubator/qpid/branches/qpid.0-10/java/cpp.async.testprofile Thu Aug 14 20:40:49 2008
@@ -1,18 +1,3 @@
 broker.version=0-10
 broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --load-module ${project.root}/../../cppStore/cpp/lib/.libs/libbdbstore.so --auth no
-broker.clean=${project.root}/clean-dir ${build.data}
-broker.ready=Listening on TCP port
-java.naming.provider.url=${project.root}/test-provider.properties
-max_prefetch=1000
-test.excludes=true
 test.excludesfile=${project.root}/010ExcludeList-store
-log=info
-amqj.logging.level=$log
-root.logging.level=$log
-log4j.configuration=file://${project.root}/log4j-test.xml
-test.fork=no
-test.mem=512M
-test=*Test
-haltonfailure=no
-haltonerror=no
-exclude.modules=none

Modified: incubator/qpid/branches/qpid.0-10/java/cpp.testprofile
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/cpp.testprofile?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/cpp.testprofile (original)
+++ incubator/qpid/branches/qpid.0-10/java/cpp.testprofile Thu Aug 14 20:40:49 2008
@@ -1,18 +1,3 @@
 broker.version=0-10
 broker=${project.root}/../cpp/src/qpidd --data-dir ${build.data} -t --auth no
-broker.clean=${project.root}/clean-dir ${build.data}
-broker.ready=Listening on TCP port
-java.naming.provider.url=${project.root}/test-provider.properties
-max_prefetch=1000
-test.excludes=true
 test.excludesfile=${project.root}/010ExcludeList
-log=info
-amqj.logging.level=$log
-root.logging.level=$log
-log4j.configuration=file://${project.root}/log4j-test.xml
-test.fork=no
-test.mem=512M
-test=*Test
-haltonfailure=no
-haltonerror=no
-exclude.modules=none

Modified: incubator/qpid/branches/qpid.0-10/java/default.testprofile
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/default.testprofile?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/default.testprofile (original)
+++ incubator/qpid/branches/qpid.0-10/java/default.testprofile Thu Aug 14 20:40:49 2008
@@ -1,14 +1,20 @@
 broker.version=0-8
 broker=vm
 broker.clean=${project.root}/clean-dir ${build.data}
+broker.ready=Listening on TCP port
+
 java.naming.provider.url=${project.root}/test-provider.properties
-test.excludes=true
-test.excludesfile=${project.root}/08ExcludeList
-log=info
 max_prefetch=1000
-amqj.logging.level=$log
-root.logging.level=$log
+
+log=debug
+amqj.logging.level=${log}
+amqj.protocol.logging.level=${log}
+root.logging.level=${log}
 log4j.configuration=file:///${project.root}/log4j-test.xml
+log4j.debug=false
+
+test.excludes=true
+test.excludesfile=${project.root}/08ExcludeList
 test.fork=no
 test.mem=512M
 test=*Test

Modified: incubator/qpid/branches/qpid.0-10/java/junit-toolkit/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/junit-toolkit/pom.xml?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/junit-toolkit/pom.xml (original)
+++ incubator/qpid/branches/qpid.0-10/java/junit-toolkit/pom.xml Thu Aug 14 20:40:49 2008
@@ -52,6 +52,18 @@
                 <version>1.2.12</version>
             </dependency>
         
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>1.4.0</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+                <version>1.4.0</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
@@ -65,6 +77,19 @@
             <groupId>log4j</groupId>
             <artifactId>log4j</artifactId>
         </dependency>
+
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>1.4.0</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+                <version>1.4.0</version>
+            </dependency>
+
     </dependencies>
 
     <build>

Modified: incubator/qpid/branches/qpid.0-10/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/junit-toolkit/src/main/org/apache/qpid/junit/concurrency/ThreadTestCoordinator.java Thu Aug 14 20:40:49 2008
@@ -20,7 +20,8 @@
  */
 package org.apache.qpid.junit.concurrency;
 
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -68,7 +69,7 @@
 public class ThreadTestCoordinator
 {
     /** Used for logging. */
-    private static final Logger log = Logger.getLogger(ThreadTestCoordinator.class);
+    private static final Logger log = LoggerFactory.getLogger(ThreadTestCoordinator.class);
 
     /** Keeps track of the test threads by their ids. */
     private TestRunnable[] testThreads; // = new TestRunnable[2];