You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gr...@apache.org on 2010/10/13 17:06:27 UTC

svn commit: r1022127 [8/15] - in /qpid/branches/grkvlt-network-20101013/qpid/java: ./ broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ broker-plugins/access-control/src/test/java/org/apache/qpid/server/securit...

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Oct 13 15:05:29 2010
@@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U extends UnprocessedMessage & AMQSession.Dispatchable> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
@@ -805,21 +805,13 @@ public abstract class BasicMessageConsum
      */
     Long getLastDelivered()
     {
-        if (!_receivedDeliveryTags.isEmpty())
+        Long lastDeliveryTag = null;
+        while (!_receivedDeliveryTags.isEmpty());
         {
-            Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
-            while (!_receivedDeliveryTags.isEmpty())
-            {
-                lastDeliveryTag = _receivedDeliveryTags.poll();
-            }
-
-            assert _receivedDeliveryTags.isEmpty();
-
-            return lastDeliveryTag;
+            lastDeliveryTag = _receivedDeliveryTags.poll();
         }
 
-        return null;
+        return lastDeliveryTag;
     }
 
     /**

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Oct 13 15:05:29 2010
@@ -19,6 +19,7 @@ package org.apache.qpid.client;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
 import org.apache.qpid.client.AMQDestination.DestSyntax;
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Oct 13 15:05:29 2010
@@ -274,93 +274,55 @@ public abstract class BasicMessageProduc
 
     public void send(Message message) throws JMSException
     {
-        checkPreConditions();
-        checkInitialDestination();
-
-
-        synchronized (_connection.getFailoverMutex())
-        {
-            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
-        }
+        send(message, _deliveryMode);
     }
 
     public void send(Message message, int deliveryMode) throws JMSException
     {
-        checkPreConditions();
-        checkInitialDestination();
-
-        synchronized (_connection.getFailoverMutex())
-        {
-            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
-        }
+        send(message, deliveryMode, _immediate);
     }
 
     public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
     {
-        checkPreConditions();
-        checkInitialDestination();
-        synchronized (_connection.getFailoverMutex())
-        {
-            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
-        }
+        send(message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
     }
 
     public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
+        send(message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+    }
+
+    public void send(Message message, int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate) throws JMSException
+    {
         checkPreConditions();
         checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+            sendImpl(_destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
         }
     }
 
     public void send(Destination destination, Message message) throws JMSException
     {
-        checkPreConditions();
-        checkDestination(destination);
-        synchronized (_connection.getFailoverMutex())
-        {
-            validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
-                     _immediate);
-        }
+        send(destination, message, _deliveryMode, _messagePriority, _timeToLive);
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
         throws JMSException
     {
-        checkPreConditions();
-        checkDestination(destination);
-        synchronized (_connection.getFailoverMutex())
-        {
-            validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
-        }
+        send((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory);
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
                      boolean mandatory) throws JMSException
     {
-        checkPreConditions();
-        checkDestination(destination);
-        synchronized (_connection.getFailoverMutex())
-        {
-            validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
-        }
+        send((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
                      boolean mandatory, boolean immediate) throws JMSException
     {
-        checkPreConditions();
-        checkDestination(destination);
-        synchronized (_connection.getFailoverMutex())
-        {
-            validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
-        }
+        send((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
@@ -439,12 +401,6 @@ public abstract class BasicMessageProduc
         }
     }
 
-    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                            boolean mandatory, boolean immediate) throws JMSException
-    {
-        sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
-    }
-
     /**
      * The caller of this method must hold the failover mutex.
      *

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Wed Oct 13 15:05:29 2010
@@ -231,14 +231,7 @@ public class FailoverHandler implements 
                 {
                     _logger.info("Failover process failed - exception being propagated by protocol handler");
                     _amqProtocolHandler.setFailoverState(FailoverState.FAILED);
-                    /*try
-                    {*/
                     _amqProtocolHandler.exception(e);
-                    /*}
-                    catch (Exception ex)
-                    {
-                        _logger.error("Error notifying protocol session of error: " + ex, ex);
-                    }*/
                 }
             }
         }

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Wed Oct 13 15:05:29 2010
@@ -77,7 +77,7 @@ public class ConnectionCloseMethodHandle
                 {
                     _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
 
-                    error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
+                    error = new AMQAuthenticationException(reason == null ? null : reason.toString(), null);
                 }
                 else if (errorCode == AMQConstant.ACCESS_REFUSED)
                 {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Wed Oct 13 15:05:29 2010
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.client.message;
 
+import java.lang.ref.SoftReference;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -38,7 +39,6 @@ import javax.jms.Session;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.collections.ReferenceMap;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQSession_0_10;
@@ -61,7 +61,7 @@ import org.apache.qpid.transport.ReplyTo
  */
 public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
 {
-    private static final Map<ReplyTo, Destination> _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+    private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
 
     public static final String JMS_TYPE = "x-jms-type";
 
@@ -216,7 +216,8 @@ public class AMQMessageDelegate_0_10 ext
         }
         else
         {
-            Destination dest = _destinationCache.get(replyTo);
+            SoftReference<Destination> ref = _destinationCache.get(replyTo);
+            Destination dest = ref.get();
             if (dest == null)
             {
                 String exchange = replyTo.getExchange();
@@ -225,11 +226,7 @@ public class AMQMessageDelegate_0_10 ext
                 dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
                         routingKey == null ? null : new AMQShortString(routingKey));
 
-
-
-
-
-                _destinationCache.put(replyTo, dest);
+                _destinationCache.put(replyTo, new SoftReference<Destination>(dest));
             }
 
             return dest;
@@ -276,7 +273,7 @@ public class AMQMessageDelegate_0_10 ext
         }
         
         final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
-        _destinationCache.put(replyTo, destination);
+        _destinationCache.put(replyTo, new SoftReference<Destination>(destination));
         _messageProps.setReplyTo(replyTo);
 
     }

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Oct 13 15:05:29 2010
@@ -21,9 +21,11 @@
 
 package org.apache.qpid.client.message;
 
+import java.lang.ref.SoftReference;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -38,7 +40,6 @@ import org.apache.qpid.client.AMQSession
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.client.JMSAMQException;
-import org.apache.qpid.collections.ReferenceMap;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderProperties;
@@ -47,7 +48,7 @@ import org.apache.qpid.url.BindingURL;
 
 public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
 {
-    private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+    private static final Map<String, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<String, SoftReference<Destination>>());
 
     public static final String JMS_TYPE = "x-jms-type";
 
@@ -181,7 +182,8 @@ public class AMQMessageDelegate_0_8 exte
         }
         else
         {
-            Destination dest = (Destination) _destinationCache.get(replyToEncoding);
+            SoftReference<Destination> ref = _destinationCache.get(replyToEncoding);
+            Destination dest = ref.get();
             if (dest == null)
             {
                 try
@@ -194,7 +196,7 @@ public class AMQMessageDelegate_0_8 exte
                     throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
                 }
 
-                _destinationCache.put(replyToEncoding, dest);
+                _destinationCache.put(replyToEncoding, new SoftReference<Destination>(dest));
             }
 
             return dest;
@@ -218,7 +220,7 @@ public class AMQMessageDelegate_0_8 exte
         final AMQDestination amqd = (AMQDestination) destination;
 
         final AMQShortString encodedDestination = amqd.getEncodedName();
-        _destinationCache.put(encodedDestination, destination);
+        _destinationCache.put(encodedDestination.asString(), new SoftReference<Destination>(amqd));
         getContentHeaderProperties().setReplyTo(encodedDestination);
     }
 

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Oct 13 15:05:29 2010
@@ -56,42 +56,44 @@ import org.apache.qpid.framing.Heartbeat
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.pool.Job;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * AMQProtocolHandler is the client side protocol handler for AMQP, it handles all protocol events received from the
- * network by MINA. The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
+ * network by MINA.
+ * 
+ * The primary purpose of AMQProtocolHandler is to translate the generic event model of MINA into the
  * specific event model of AMQP, by revealing the type of the received events (from decoded data), and passing the
  * event on to more specific handlers for the type. In this sense, it channels the richer event model of AMQP,
  * expressed in terms of methods and so on, through the cruder, general purpose event model of MINA, expressed in
  * terms of "message received" and so on.
- *
- * <p/>There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
+ * <p>
+ * There is a 1:1 mapping between an AMQProtocolHandler and an {@link AMQConnection}. The connection class is
  * exposed to the end user of the AMQP client API, and also implements the JMS Connection API, so provides the public
  * API calls through which an individual connection can be manipulated. This protocol handler talks to the network
  * through MINA, in a behind the scenes role; it is not an exposed part of the client API.
- *
- * <p/>There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
+ * <p>
+ * There is a 1:many mapping between an AMQProtocolHandler and a set of {@link AMQSession}s. At the MINA level,
  * there is one session per connection. At the AMQP level there can be many channels which are also called sessions in
  * JMS parlance. The {@link AMQSession}s are managed through an {@link AMQProtocolSession} instance. The protocol
  * session is similar to the MINA per-connection session, except that it can span the lifecycle of multiple MINA sessions
  * in the event of failover. See below for more information about this.
- *
- * <p/>Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
+ * <p>
+ * Mina provides a session container that can be used to store/retrieve arbitrary objects as String named
  * attributes. A more convenient, type-safe, container for session data is provided in the form of
  * {@link AMQProtocolSession}.
- *
- * <p/>A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
+ * <p>
+ * A common way to use MINA is to have a single instance of the event handler, and for MINA to pass in its session
  * object with every event, and for per-connection data to be held in the MINA session (perhaps using a type-safe wrapper
  * as described above). This event handler is different, because dealing with failover complicates things. To the
  * end client of an AMQConnection, a failed over connection is still handled through the same connection instance, but
@@ -99,8 +101,8 @@ import org.slf4j.LoggerFactory;
  * be used to track the state of the fail-over process, because it is destroyed and a new one is created, as the old
  * connection is shutdown and a new one created. For this reason, an AMQProtocolHandler is created per AMQConnection
  * and the protocol session data is held outside of the MINA IOSession.
- *
- * <p/>This handler is responsibile for setting up the filter chain to filter all events for this handler through.
+ * <p>
+ * This handler is responsibile for setting up the filter chain to filter all events for this handler through.
  * The filter chain is set up as a stack of event handers that perform the following functions (working upwards from
  * the network traffic at the bottom), handing off incoming events to an asynchronous thread pool to do the work,
  * optionally handling secure sockets encoding/decoding, encoding/decoding the AMQP format itself.
@@ -118,7 +120,7 @@ import org.slf4j.LoggerFactory;
  * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
  * that lifecycles of the fields match lifecycles of their containing objects.
  */
-public class AMQProtocolHandler implements ProtocolEngine
+public class AMQProtocolHandler implements Receiver<java.nio.ByteBuffer>
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
@@ -170,7 +172,9 @@ public class AMQProtocolHandler implemen
     private Job _readJob;
     private Job _writeJob;
     private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
-    private NetworkDriver _networkDriver;
+    private Sender<ByteBuffer> _sender;
+    private NetworkConnection _network;
+    private NetworkTransport _transport;
     private ProtocolVersion _suggestedProtocolVersion;
 
     private long _writtenBytes;
@@ -194,21 +198,6 @@ public class AMQProtocolHandler implemen
     }
 
     /**
-     * Called when we want to create a new IoTransport session
-     * @param brokerDetail
-     */
-    public void createIoTransportSession(BrokerDetails brokerDetail)
-    {
-        _protocolSession = new AMQProtocolSession(this, _connection);
-        _stateManager.setProtocolSession(_protocolSession);
-        IoTransport.connect_0_9(getProtocolSession(),
-                                brokerDetail.getHost(),
-                                brokerDetail.getPort(),
-                                brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
-        _protocolSession.init();
-    }
-
-    /**
      * Called when the network connection is closed. This can happen, either because the client explicitly requested
      * that the connection be closed, in which case nothing is done, or because the connection died. In the case
      * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -290,7 +279,7 @@ public class AMQProtocolHandler implemen
         //  failover:
         HeartbeatDiagnostics.timeout();
         _logger.warn("Timed out while waiting for heartbeat from peer.");
-        _networkDriver.close();
+        _sender.close();
     }
 
     public void writerIdle()
@@ -312,7 +301,10 @@ public class AMQProtocolHandler implemen
             {
                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
                 // this will attempt failover
-                _networkDriver.close();
+                if (_sender != null)
+                {
+	                _sender.close();
+                }
                 closed();
             }
             else
@@ -564,7 +556,7 @@ public class AMQProtocolHandler implemen
         {
             public void run()
             {
-                _networkDriver.send(buf);
+                _sender.send(buf);
             }
         });
         if (PROTOCOL_DEBUG)
@@ -585,7 +577,7 @@ public class AMQProtocolHandler implemen
 
         if (wait)
         {
-            _networkDriver.flush();
+            _sender.flush();
         }
     }
 
@@ -699,7 +691,7 @@ public class AMQProtocolHandler implemen
             try
             {
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
-                _networkDriver.close();
+                _sender.close();
                 closed();
             }
             catch (AMQTimeoutException e)
@@ -714,18 +706,6 @@ public class AMQProtocolHandler implemen
         _poolReference.releaseExecutorService();
     }
 
-    /** @return the number of bytes read from this protocol session */
-    public long getReadBytes()
-    {
-        return _readBytes;
-    }
-
-    /** @return the number of bytes written to this protocol session */
-    public long getWrittenBytes()
-    {
-        return _writtenBytes;
-    }
-
     public void failover(String host, int port)
     {
         _failoverHandler.setHost(host);
@@ -819,17 +799,19 @@ public class AMQProtocolHandler implemen
 
     public SocketAddress getRemoteAddress()
     {
-        return _networkDriver.getRemoteAddress();
+        return _network.getRemoteAddress();
     }
 
     public SocketAddress getLocalAddress()
     {
-        return _networkDriver.getLocalAddress();
+        return _transport.getAddress();
     }
 
-    public void setNetworkDriver(NetworkDriver driver)
+    public void connect(NetworkTransport transport, NetworkConnection network)
     {
-        _networkDriver = driver;
+        _transport = transport;
+        _network = network;
+        _sender = network.getSender();
     }
 
     /** @param delay delay in seconds (not ms) */
@@ -837,17 +819,13 @@ public class AMQProtocolHandler implemen
     {
         if (delay > 0)
         {
-            getNetworkDriver().setMaxWriteIdle(delay);
-            getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+//        FIXME
+//            _sender.setMaxWriteIdle(delay);
+//            _sender.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
             HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
 
-    public NetworkDriver getNetworkDriver()
-    {
-        return _networkDriver;
-    }
-
     public ProtocolVersion getSuggestedProtocolVersion()
     {
         return _suggestedProtocolVersion;

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Oct 13 15:05:29 2010
@@ -20,27 +20,36 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -79,8 +88,7 @@ public class AMQProtocolSession implemen
     private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
 
     /** Counter to ensure unique queue names */
-    protected int _queueId = 1;
-    protected final Object _queueIdLock = new Object();
+    protected static final AtomicLong _queueIdGenerator = new AtomicLong(0);
 
     private ProtocolVersion _protocolVersion;
 //    private VersionSpecificRegistry _registry =
@@ -380,11 +388,7 @@ public class AMQProtocolSession implemen
 
     protected AMQShortString generateQueueName()
     {
-        int id;
-        synchronized (_queueIdLock)
-        {
-            id = _queueId++;
-        }
+        long id = _queueIdGenerator.incrementAndGet();
         // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
         String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
         String queueName = "tmp_" + localAddress + "_" + id;

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/security/crammd5hashed/CRAMMD5HashedSaslClientFactory.java Wed Oct 13 15:05:29 2010
@@ -36,7 +36,7 @@ public class CRAMMD5HashedSaslClientFact
     public static final String MECHANISM = "CRAM-MD5-HASHED";
 
 
-    public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map<String, ?> props, CallbackHandler cbh) throws SaslException
+    public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
     {
         for (int i = 0; i < mechanisms.length; i++)
         {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Wed Oct 13 15:05:29 2010
@@ -20,42 +20,21 @@
  */
 package org.apache.qpid.client.util;
 
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
+ * A {@link BlockingQueue} that emits events above a user specified threshold allowing the caller to take action (e.g. flow
  * control) to try to prevent the queue growing (much) further. The underlying queue itself is not bounded therefore the
- * caller is not obliged to react to the events. <p/> This implementation is <b>only</b> safe where we have a single
- * thread adding items and a single (different) thread removing items.
- *
- * @todo Make this implement java.util.Queue and hide the implementation. Then different queue types can be substituted.
+ * caller is not obliged to react to the events.
  */
-public class FlowControllingBlockingQueue
+public class FlowControllingBlockingQueue<E> extends LinkedBlockingQueue<E>
 {
-	private static final Logger _logger = LoggerFactory.getLogger(FlowControllingBlockingQueue.class);
-	
-    /** This queue is bounded and is used to store messages before being dispatched to the consumer */
-    private final Queue _queue = new ConcurrentLinkedQueue();
-
     private final int _flowControlHighThreshold;
     private final int _flowControlLowThreshold;
 
     private final ThresholdListener _listener;
-
-    /** We require a separate count so we can track whether we have reached the threshold */
-    private int _count;
     
-    private boolean disableFlowControl; 
-
-    public boolean isEmpty()
-    {
-        return _queue.isEmpty();
-    }
+    private boolean _disableFlowControl; 
 
     public interface ThresholdListener
     {
@@ -63,6 +42,11 @@ public class FlowControllingBlockingQueu
 
         void underThreshold(int currentValue);
     }
+    
+    public FlowControllingBlockingQueue()
+    {
+        this(0, null);
+    }
 
     public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
     {
@@ -71,65 +55,52 @@ public class FlowControllingBlockingQueu
 
     public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
     {
+        super();
+        
         _flowControlHighThreshold = highThreshold;
         _flowControlLowThreshold = lowThreshold;
         _listener = listener;
+        
         if (highThreshold == 0)
         {
-        	disableFlowControl = true;
+        	_disableFlowControl = true;
         }
     }
 
-    public Object take() throws InterruptedException
+    public E take() throws InterruptedException
     {
-        Object o = _queue.poll();
-        if(o == null)
-        {
-            synchronized(this)
-            {
-                while((o = _queue.poll())==null)
-                {
-                    wait();
-                }
-            }
-        }
-        if (!disableFlowControl && _listener != null)
+        E e = super.take();
+        
+        if (!_disableFlowControl && _listener != null)
         {
             synchronized (_listener)
             {
-                if (_count-- == _flowControlLowThreshold)
+                if (size() == _flowControlLowThreshold)
                 {
-                    _listener.underThreshold(_count);
+                    _listener.underThreshold(size());
                 }
             }
             
         }
 
-        return o;
+        return e;
     }
 
-    public void add(Object o)
+    public boolean add(E e)
     {
-        synchronized(this)
-        {
-            _queue.add(o);
-
-            notifyAll();
-        }
-        if (!disableFlowControl && _listener != null)
+        super.add(e);
+        
+        if (!_disableFlowControl && _listener != null)
         {
             synchronized (_listener)
             {
-                if (++_count == _flowControlHighThreshold)
+                if (size() == _flowControlHighThreshold)
                 {
-                    _listener.aboveThreshold(_count);
+                    _listener.aboveThreshold(size());
                 }
             }
         }
-    }
-
-    public Iterator iterator()
-    {
-        return _queue.iterator();
+        
+        return true;
     }
 }

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Wed Oct 13 15:05:29 2010
@@ -20,17 +20,19 @@
  */
 package org.apache.qpid.jms;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.qpid.client.SSLConfiguration;
 
 public interface BrokerDetails
 {
-
     /*
      * Known URL Options
      * @see ConnectionURL
-    */
+     */
+
     public static final String OPTIONS_RETRY = "retries";
     public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
     public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
@@ -54,6 +56,8 @@ public interface BrokerDetails
 
     public static final String SOCKET = "socket";
     public static final String TCP = "tcp";
+    public static final String UDP = "udp";
+    public static final String MULTICAST = "multicast";
     public static final String VM = "vm";
 
     public static final String DEFAULT_TRANSPORT = TCP;

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/AcceptorTest.java Wed Oct 13 15:05:29 2010
@@ -39,7 +39,7 @@ import junit.framework.TestCase;
  * Tests MINA socket performance. This acceptor simply reads data from the network and writes it back again.
  *
  */
-public class AcceptorTest extends TestCase
+public class AcceptorTest extends QpidTestCase
 {
     private static final Logger _logger = Logger.getLogger(AcceptorTest.class);
 

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/BlockingAcceptorTest.java Wed Oct 13 15:05:29 2010
@@ -30,7 +30,7 @@ import java.net.Socket;
 
 import junit.framework.TestCase;
 
-public class BlockingAcceptorTest extends TestCase
+public class BlockingAcceptorTest extends QpidTestCase
 {
     private static final Logger _logger = Logger.getLogger(BlockingAcceptorTest.class);
 

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/mina/WriterTest.java Wed Oct 13 15:05:29 2010
@@ -32,7 +32,7 @@ import java.util.concurrent.CountDownLat
 
 import junit.framework.TestCase;
 
-public class WriterTest extends TestCase
+public class WriterTest extends QpidTestCase
 {
     private static final Logger _logger = Logger.getLogger(WriterTest.class);
 

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/multiconsumer/AMQTest.java Wed Oct 13 15:05:29 2010
@@ -47,7 +47,7 @@ import org.apache.qpid.jms.Session;
 /**
  * Test AMQ.
  */
-public class AMQTest extends TestCase implements ExceptionListener
+public class AMQTest extends QpidTestCase implements ExceptionListener
 {
 
     private final static String COMPRESSION_PROPNAME = "_MSGAPI_COMP";

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/PropertiesFileInitialContextFactoryTest.java Wed Oct 13 15:05:29 2010
@@ -34,7 +34,7 @@ import java.io.InputStream;
 
 import junit.framework.TestCase;
 
-public class PropertiesFileInitialContextFactoryTest extends TestCase
+public class PropertiesFileInitialContextFactoryTest extends QpidTestCase
 {
     InitialContextFactory contextFactory;
     Properties _properties;

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueTest.java Wed Oct 13 15:05:29 2010
@@ -21,10 +21,9 @@
 package org.apache.qpid.client;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-import junit.framework.TestCase;
-
-public class AMQQueueTest extends TestCase
+public class AMQQueueTest extends QpidTestCase
 {
     AMQShortString exchange = new AMQShortString("test.exchange");
     AMQShortString routingkey = new AMQShortString("test-route");

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Wed Oct 13 15:05:29 2010
@@ -29,6 +29,7 @@ import org.apache.qpid.url.URLSyntaxExce
 
 import java.io.IOException;
 
+// FIXME
 public class MockAMQConnection extends AMQConnection
 {
     public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost)

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/message/AbstractJMSMessageTest.java Wed Oct 13 15:05:29 2010
@@ -23,9 +23,9 @@ package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import junit.framework.TestCase;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-public class AbstractJMSMessageTest extends TestCase
+public class AbstractJMSMessageTest extends QpidTestCase
 {
 
     public void testSetNullJMSReplyTo08() throws JMSException

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Wed Oct 13 15:05:29 2010
@@ -27,7 +27,6 @@ import org.apache.qpid.framing.AMQMethod
 import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.TestNetworkDriver;
 import org.apache.qpid.client.MockAMQConnection;
 import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.state.AMQState;
@@ -73,7 +72,8 @@ public class AMQProtocolHandlerTest exte
     {
         //Create a new ProtocolHandler with a fake connection.
         _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
-        _handler.setNetworkDriver(new TestNetworkDriver());
+        // FIXME
+//        _handler.setSender(new TestNetworkDriver());
          AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
         _blockFrame = new AMQFrame(0, body);
 
@@ -93,16 +93,14 @@ public class AMQProtocolHandlerTest exte
      */
     public void testFrameListenerUpdateWithAMQException() throws InterruptedException
     {
-        AMQException trigger = new AMQAuthenticationException(AMQConstant.ACCESS_REFUSED,
-                                                              "AMQPHTest", new RuntimeException());
+        AMQException trigger = new AMQAuthenticationException("AMQPHTest", new RuntimeException());
 
         performWithException(trigger);
 
 
         AMQException receivedException = (AMQException) _listener.getReceivedException();
 
-        assertEquals("Return exception was not the expected type",
-                     AMQAuthenticationException.class, receivedException.getClass());
+        assertTrue("Return exception was not the expected type", receivedException instanceof AMQException);
 
         assertEquals("The _Listener did not receive the correct error code",
                      trigger.getErrorCode(), receivedException.getErrorCode());

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTableKeyEnumeratorTest.java Wed Oct 13 15:05:29 2010
@@ -26,14 +26,13 @@ import java.util.NoSuchElementException;
 
 import javax.jms.JMSException;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.client.message.TestMessageHelper;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-public class FieldTableKeyEnumeratorTest extends TestCase
+public class FieldTableKeyEnumeratorTest extends QpidTestCase
 {
     public void testTrue()
     {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/FieldTablePropertyTest.java Wed Oct 13 15:05:29 2010
@@ -24,12 +24,11 @@ import java.util.Enumeration;
 
 import javax.jms.JMSException;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-public class FieldTablePropertyTest extends TestCase
+public class FieldTablePropertyTest extends QpidTestCase
 {
     public void testPropertyNames()
     {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Wed Oct 13 15:05:29 2010
@@ -20,18 +20,11 @@
  */
 package org.apache.qpid.test.unit.client.BrokerDetails;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
 import org.apache.qpid.client.AMQBrokerDetails;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.url.URLSyntaxException;
 
-public class BrokerDetailsTest extends TestCase
+public class BrokerDetailsTest extends QpidTestCase
 {
     public void testMultiParameters() throws URLSyntaxException
     {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Wed Oct 13 15:05:29 2010
@@ -20,15 +20,14 @@
  */
 package org.apache.qpid.test.unit.client.connectionurl;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.client.AMQBrokerDetails;
 import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.url.URLSyntaxException;
 
-public class ConnectionURLTest extends TestCase
+public class ConnectionURLTest extends QpidTestCase
 {
 
     public void testFailoverURL() throws URLSyntaxException

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/destinationurl/DestinationURLTest.java Wed Oct 13 15:05:29 2010
@@ -20,16 +20,15 @@
  */
 package org.apache.qpid.test.unit.client.destinationurl;
 
-import junit.framework.TestCase;
+import java.net.URISyntaxException;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URISyntaxException;
-
-public class DestinationURLTest extends TestCase
+public class DestinationURLTest extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(DestinationURLTest.class);
 

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/BytesMessageTest.java Wed Oct 13 15:05:29 2010
@@ -27,12 +27,11 @@ import javax.jms.MessageFormatException;
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.client.message.JMSBytesMessage;
 import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-public class BytesMessageTest extends TestCase
+public class BytesMessageTest extends QpidTestCase
 {
     /**
      * Tests that on creation a call to getBodyLength() throws an exception

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/MapMessageTest.java Wed Oct 13 15:05:29 2010
@@ -24,13 +24,13 @@ import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.qpid.client.message.JMSMapMessage;
 import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
 
 
-public class MapMessageTest extends TestCase
+public class MapMessageTest extends QpidTestCase
 {
 
     //Test Lookups

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java Wed Oct 13 15:05:29 2010
@@ -29,15 +29,14 @@ import javax.jms.MessageNotReadableExcep
 import javax.jms.MessageNotWriteableException;
 import javax.jms.StreamMessage;
 
-import junit.framework.TestCase;
-
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
 
 /**
  * @author Apache Software Foundation
  */
-public class StreamMessageTest extends TestCase
+public class StreamMessageTest extends QpidTestCase
 {
     /**
      * Tests that on creation a call to getBodyLength() throws an exception

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/TextMessageTest.java Wed Oct 13 15:05:29 2010
@@ -23,13 +23,13 @@ package org.apache.qpid.test.unit.client
 import javax.jms.JMSException;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.qpid.client.message.JMSMapMessage;
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.client.message.TestMessageHelper;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-public class TextMessageTest extends TestCase
+public class TextMessageTest extends QpidTestCase
 {
     public void testTextOnConstruction() throws Exception
     {

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Wed Oct 13 15:05:29 2010
@@ -20,13 +20,13 @@
  */
 package org.apache.qpid.test.unit.jndi;
 
-import junit.framework.TestCase;
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.url.URLSyntaxException;
 
-public class ConnectionFactoryTest extends TestCase
+public class ConnectionFactoryTest extends QpidTestCase
 {
 
     //URL will be returned with the password field swapped for '********'

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Wed Oct 13 15:05:29 2010
@@ -29,10 +29,9 @@ import javax.naming.InitialContext;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.test.utils.QpidTestCase;
 
-import junit.framework.TestCase;
-
-public class JNDIPropertyFileTest extends TestCase
+public class JNDIPropertyFileTest extends QpidTestCase
 {
     Context ctx;
     

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Wed Oct 13 15:05:29 2010
@@ -20,23 +20,24 @@
  */
 package org.apache.qpid.test.unit.message;
 
-import javax.jms.*;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.client.*;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.JMSTextMessage;
+import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import java.util.Map;
+import org.apache.qpid.test.utils.QpidTestCase;
 
 
-public class MessageConverterTest extends TestCase
+public class MessageConverterTest extends QpidTestCase
 {
 
     public static final String JMS_CORR_ID = "QPIDID_01";

Added: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,182 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.common.support.DefaultConnectFuture;
+import org.apache.mina.util.NewThreadExecutor;
+
+/**
+ * Extension of {@link SocketConnector} using an existing open socket.
+ */
+public class ExistingSocketConnector extends SocketConnector
+{
+    private static final Map<String, Socket> OPEN_SOCKET_REGISTER = new ConcurrentHashMap<String, Socket>();
+
+    private static final AtomicInteger nextId = new AtomicInteger();
+    private final int id = nextId.getAndIncrement();
+    private final SocketIoProcessor[] ioProcessors;
+    private final int processorCount;
+    private int processorDistributor = 0;
+
+    private Socket _openSocket = null;
+
+    public static void registerOpenSocket(String socketID, Socket openSocket)
+    {
+        OPEN_SOCKET_REGISTER.put(socketID, openSocket);
+    }
+
+    public static Socket removeOpenSocket(String socketID)
+    {
+        return OPEN_SOCKET_REGISTER.remove(socketID);
+    }
+
+    public void setOpenSocket(Socket openSocket)
+    {
+        _openSocket = openSocket;
+    }
+
+    /**
+     * Create a connector with a single processing thread using a NewThreadExecutor
+     */
+    public ExistingSocketConnector()
+    {
+        this(1, new NewThreadExecutor());
+    }
+
+    /**
+     * Create a connector with the desired number of processing threads
+     *
+     * @param processorCount Number of processing threads
+     * @param executor Executor to use for launching threads
+     */
+    public ExistingSocketConnector(int processorCount, Executor executor) {
+        if (processorCount < 1)
+        {
+            throw new IllegalArgumentException("Must have at least one processor");
+        }
+
+        this.processorCount = processorCount;
+        ioProcessors = new SocketIoProcessor[processorCount];
+
+        for (int i = 0; i < processorCount; i++)
+        {
+            ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor);
+        }
+    }
+
+    /**
+     * Changes here from the Mina OpenSocketConnector.
+     * 
+     * Ignoring all address as they are not needed.
+     */
+    public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, IoHandler handler, IoServiceConfig config)
+    {
+        if (handler == null)
+        {
+            throw new NullPointerException("handler");
+        }
+        if (config == null)
+        {
+            config = getDefaultConfig();
+        }
+        if (_openSocket == null)
+        {
+            throw new IllegalArgumentException("Specifed Socket not active");
+        }
+
+        boolean success = false;
+
+        try
+        {
+            DefaultConnectFuture future = new DefaultConnectFuture();
+            newSession(_openSocket.getChannel(), handler, config, future);
+            success = true;
+            return future;
+        }
+        catch (IOException e)
+        {
+            return DefaultConnectFuture.newFailedFuture(e);
+        }
+        finally
+        {
+            if (!success && _openSocket != null)
+            {
+                try
+                {
+                    _openSocket.close();
+                }
+                catch (IOException e)
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+            }
+        }
+    }
+
+    private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture)
+            throws IOException
+    {
+        SocketSessionImpl session = new SocketSessionImpl(this,
+                nextProcessor(), getListeners(), config, ch, handler,
+                ch.socket().getRemoteSocketAddress());
+        try
+        {
+            getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            config.getThreadModel().buildFilterChain(session.getFilterChain());
+        }
+        catch (Throwable e)
+        {
+            throw (IOException) new IOException("Failed to create a session.").initCause(e);
+        }
+
+        // Set the ConnectFuture of the specified session, which will be
+        // removed and notified by AbstractIoFilterChain eventually.
+        session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, connectFuture);
+
+        // Forward the remaining process to the SocketIoProcessor.
+        session.getIoProcessor().addNew(session);
+    }
+
+    private SocketIoProcessor nextProcessor()
+    {
+        if (processorDistributor == Integer.MAX_VALUE)
+        {
+            processorDistributor = Integer.MAX_VALUE % processorCount;
+        }
+
+        return ioProcessors[processorDistributor++ % processorCount];
+    }
+}
\ No newline at end of file

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/AMQDisconnectedException.java Wed Oct 13 15:05:29 2010
@@ -32,6 +32,11 @@ package org.apache.qpid;
  */
 public class AMQDisconnectedException extends AMQException
 {
+    public AMQDisconnectedException(String msg)
+    {
+        super(null, msg);
+    }
+    
     public AMQDisconnectedException(String msg, Throwable cause)
     {
         super(null, msg, cause);

Added: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/BrokerOptions.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,230 @@
+package org.apache.qpid;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+public class BrokerOptions extends HashMap<String, List<String>>
+{
+    /** serialVersionUID */
+    private static final long serialVersionUID = 8051825964945442234L;
+    
+    public static final Integer DEFAULT_PORT = 5672;
+    public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+    public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
+    public static final String QPID_HOME = "QPID_HOME";
+    
+    public static final String PORTS = "p";
+    public static final String EXCLUDE_0_10 = "exclude-0-10";
+    public static final String EXCLUDE_0_9_1 = "exclude-0-9-1";
+    public static final String EXCLUDE_0_9 = "exclude-0-9";
+    public static final String EXCLUDE_0_8 = "exclude-0-8";
+    public static final String BIND = "b";
+    public static final String MANAGEMENT = "m";
+    public static final String LOG4J = "l";
+    public static final String WATCH = "w";
+    public static final String CONFIG = "c";
+    public static final String PROTOCOL = "t";
+    
+    public static final String[] COMMAND_LINE_OPTIONS = new String[] {
+        PORTS, EXCLUDE_0_10, EXCLUDE_0_9_1, EXCLUDE_0_9, EXCLUDE_0_8,
+        BIND, MANAGEMENT, LOG4J, WATCH, CONFIG, PROTOCOL,
+    };
+    
+    public void setPorts(Integer...ports)
+    {
+        put(PORTS, ports);
+    }
+    
+    public List<Integer> getPorts()
+    {
+        return getList(PORTS);
+    }
+    
+    public void setExclude_0_10Ports(Integer...ports)
+    {
+        put(EXCLUDE_0_10, ports);
+    }
+    
+    public List<Integer> getExclude_0_10Ports()
+    {
+        return getList(EXCLUDE_0_10);
+    }
+    
+    public void setExclude_0_9_1Ports(Integer...ports)
+    {
+        put(EXCLUDE_0_9_1, ports);
+    }
+    
+    public List<Integer> getExclude_0_9_1Ports()
+    {
+        return getList(EXCLUDE_0_9_1);
+    }
+    
+    public void setExclude_0_9Ports(Integer...ports)
+    {
+        put(EXCLUDE_0_9, ports);
+    }
+    
+    public List<Integer> getExclude_0_9Ports()
+    {
+        return getList(EXCLUDE_0_9);
+    }
+    
+    public void setExclude_0_8Ports(Integer...ports)
+    {
+        put(EXCLUDE_0_8, ports);
+    }
+    
+    public List<Integer> getExclude_0_8Ports()
+    {
+        return getList(EXCLUDE_0_8);
+    }
+    
+    public void setManagementPort(Integer management)
+    {
+        put(MANAGEMENT, Integer.toString(management));
+    }
+    
+    public Integer getManagementPort()
+    {
+        return getInteger(MANAGEMENT);
+    }
+    
+    public void setBind(String bind)
+    {
+        put(BIND, bind);
+    }
+    
+    public String getBind()
+    {
+        return getValue(BIND);
+    }
+    
+    public void setLog4JFile(String log4j)
+    {
+        put(LOG4J, log4j);
+    }
+    
+    public String getLog4JFile()
+    {
+        return getValue(LOG4J);
+    }
+    
+    public void setLog4JWatch(Integer watch)
+    {
+        put(WATCH, Integer.toString(watch));
+    }
+    
+    public Integer getLog4JWatch()
+    {
+        return getInteger(WATCH);
+    }
+    
+    public void setConfigFile(String config)
+    {
+        put(CONFIG, config);
+    }
+    
+    public String getConfigFile()
+    {
+        return getValue(CONFIG);
+    }
+    
+    public void setProtocol(String protocol)
+    {
+        put(PROTOCOL, protocol);
+    }
+    
+    public String getProtocol()
+    {
+        return getValue(PROTOCOL);
+    }
+    
+    public void put(String key, String value)
+    {
+        if (value != null)
+        {
+	        put(key, Collections.singletonList(value));
+        }
+    }
+    
+    public void put(String key, String...values)
+    {
+        if (values != null)
+        {
+            put(key, Arrays.asList(values));
+        }
+    }
+    
+    public void put(String key, Integer...values)
+    {
+        List<String> list = new ArrayList<String>();
+        for (Integer i : values)
+        {
+            list.add(Integer.toString(i));
+        }
+        put(key, list);
+    }
+    
+    public Integer getInteger(Object key)
+    {
+        return getInteger(key, null);
+    }
+    
+    public Integer getInteger(Object key, Integer defaultValue)
+    {
+        if (!containsKey(key))
+        {
+            return defaultValue;
+        }
+        List<String> values = get(key);
+        return Integer.valueOf(values.get(0));
+    }
+    
+    public List<Integer> getList(Object key)
+    {
+        return getList(key, null);
+    }
+    
+    public List<Integer> getList(Object key, List<Integer> defaultValues)
+    {
+        if (!containsKey(key))
+        {
+            return defaultValues;
+        }
+        List<String> list = get(key);
+        List<Integer> values = new ArrayList<Integer>();
+        for (String s : list)
+        {
+            values.add(Integer.valueOf(s));
+        }
+        return values;
+    }
+    
+    public String getValue(Object key)
+    {
+        return getValue(key, null);
+    }
+    
+    public String getValue(Object key, String defaultValue)
+    {
+        if (!containsKey(key))
+        {
+            return defaultValue;
+        }
+        List<String> values = get(key);
+        return values.get(0);
+    }
+    
+    public List<String> get(Object key, List<String> defaultValues)
+    {
+        if (!containsKey(key))
+        {
+            return defaultValues;
+        }
+        return get(key);
+    }
+}

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/common/Closeable.java Wed Oct 13 15:05:29 2010
@@ -20,7 +20,11 @@
  */
 package org.apache.qpid.common;
 
-
+/**
+ * Interface indicating an object can be closed.
+ * 
+ * Used as a marker for various components of the broker application registry, to allow clean shutdown.
+ */
 public interface Closeable
 {
     public void close();

Added: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java?rev=1022127&view=auto
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java (added)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/protocol/ReceiverFactory.java Wed Oct 13 15:05:29 2010
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.NetworkTransport;
+
+public interface ReceiverFactory  
+{ 
+    /**
+     * Returns a new instance of a {@link Receiver}. 
+     */
+    Receiver<ByteBuffer> newReceiver(NetworkTransport transport, NetworkConnection network); 
+} 
\ No newline at end of file

Modified: qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java?rev=1022127&r1=1022126&r2=1022127&view=diff
==============================================================================
--- qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java (original)
+++ qpid/branches/grkvlt-network-20101013/qpid/java/common/src/main/java/org/apache/qpid/thread/RealtimeThreadFactory.java Wed Oct 13 15:05:29 2010
@@ -20,49 +20,55 @@ package org.apache.qpid.thread;
  * 
  */
 
-
 import java.lang.reflect.Constructor;
+import java.util.concurrent.ThreadFactory;
 
 public class RealtimeThreadFactory implements ThreadFactory
 {
-    private Class threadClass;
-    private Constructor threadConstructor;
-    private Constructor priorityParameterConstructor;
-    private int defaultRTThreadPriority = 20;
+    private Class<?> _threadClass;
+    private Constructor<?> _threadConstructor;
+    private Constructor<?> _priorityParameterConstructor;
+    private int _defaultRTThreadPriority = 20;
     
     public RealtimeThreadFactory() throws Exception
     {
-        defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority",20);
-        threadClass = Class.forName("javax.realtime.RealtimeThread");
+        _defaultRTThreadPriority = Integer.getInteger("qpid.rt_thread_priority", 20);
+        _threadClass = Class.forName("javax.realtime.RealtimeThread");
     
-        Class schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
-        Class releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
-        Class memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
-        Class memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
-        Class processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
+        Class<?> schedulingParametersClass = Class.forName("javax.realtime.SchedulingParameters");
+        Class<?> releaseParametersClass = Class.forName("javax.realtime.ReleaseParameters");
+        Class<?> memoryParametersClass = Class.forName("javax.realtime.MemoryParameters");
+        Class<?> memoryAreaClass = Class.forName("javax.realtime.MemoryArea");
+        Class<?> processingGroupParametersClass = Class.forName("javax.realtime.ProcessingGroupParameters");
      
-        Class[] paramTypes = new Class[]{schedulingParametersClass,
-                                         releaseParametersClass, 
-                                         memoryParametersClass,
-                                         memoryAreaClass,
-                                         processingGroupParametersClass,
-                                         java.lang.Runnable.class};
+        Class<?>[] paramTypes = new Class[] { schedulingParametersClass,
+                                              releaseParametersClass, 
+                                              memoryParametersClass,
+                                              memoryAreaClass,
+                                              processingGroupParametersClass,
+                                              java.lang.Runnable.class };
         
-        threadConstructor = threadClass.getConstructor(paramTypes);
+        _threadConstructor = _threadClass.getConstructor(paramTypes);
         
-        Class priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
-        priorityParameterConstructor = priorityParameterClass.getConstructor(new Class[]{int.class});        
+        Class<?> priorityParameterClass = Class.forName("javax.realtime.PriorityParameters");
+        _priorityParameterConstructor = priorityParameterClass.getConstructor(new Class<?>[] { Integer.TYPE });        
     }
 
-    public Thread createThread(Runnable r) throws Exception
+    public Thread newThread(Runnable r)
     {
-        return createThread(r,defaultRTThreadPriority);
+        return createThread(r,_defaultRTThreadPriority);
     }
 
-    public Thread createThread(Runnable r, int priority) throws Exception
+    public Thread createThread(Runnable r, int priority)
     {
-        Object priorityParams = priorityParameterConstructor.newInstance(priority);
-        return (Thread)threadConstructor.newInstance(priorityParams,null,null,null,null,r);
+        try
+        {
+	        Object priorityParams = _priorityParameterConstructor.newInstance(priority);
+	        return (Thread) _threadConstructor.newInstance(priorityParams, null, null, null, null, r);
+        }
+        catch (Exception e)
+        {
+            return null;
+        }
     }
-
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org