You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [32/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Fri Oct 21 14:42:12 2011
@@ -20,15 +20,21 @@
  */
 package org.apache.qpid.client.message;
 
+import java.io.DataInputStream;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
 
 import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.util.Strings;
@@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
 {
     private static final String MIME_TYPE = "text/plain";
 
+    private Exception _exception;
     private String _decodedValue;
 
     /**
@@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
     private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
     private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
-    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
-    {
-        this(delegateFactory, null, null);
-    }
+    private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder();
+    private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder();
+
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
 
-    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
+    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        super(delegateFactory, data); // this instantiates a content header
-        setContentType(getMimeType());
-        setEncoding(encoding);
+        super(delegateFactory, false); // this instantiates a content header
     }
 
     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
             throws AMQException
     {
-        super(delegate, data);
-        setContentType(getMimeType());
-        _data = data;
-    }
+        super(delegate, data!=null);
 
-
-    public void clearBodyImpl() throws JMSException
-    {
-        if (_data != null)
+        try
         {
-            _data.release();
-            _data = null;
+            if(propertyExists(PAYLOAD_NULL_PROPERTY))
+            {
+                _decodedValue = null;
+            }
+            else
+            {
+                _decodedValue = _decoder.decode(data).toString();
+            }
+        }
+        catch (CharacterCodingException e)
+        {
+            _exception = e;
+        }
+        catch (JMSException e)
+        {
+            _exception = e;
         }
 
-        _decodedValue = null;
     }
 
     public String toBodyString() throws JMSException
@@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
         return MIME_TYPE;
     }
 
-    public void setText(String text) throws JMSException
+    @Override
+    public ByteBuffer getData() throws JMSException
     {
-        checkWritable();
-
-        clearBody();
+        _encoder.reset();
         try
         {
-            if (text != null)
+            if(_exception != null)
+            {
+                final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message");
+                messageFormatException.setLinkedException(_exception);
+                throw messageFormatException;
+            }
+            else if(_decodedValue == null)
+            {
+                return EMPTY_BYTE_BUFFER;
+            }
+            else
             {
-                final String encoding = getEncoding();
-                if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
-                {
-                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
-                    setEncoding("UTF-8");
-                }
-                else
-                {
-                    _data = ByteBuffer.wrap(text.getBytes(encoding));
-                }
-                _data.position(_data.limit());
-                _changedData=true;
+                return _encoder.encode(CharBuffer.wrap(_decodedValue));
             }
-            _decodedValue = text;
         }
-        catch (UnsupportedEncodingException e)
+        catch (CharacterCodingException e)
         {
-            // should never occur
-            JMSException jmse = new JMSException("Unable to decode text data");
-            jmse.setLinkedException(e);
-            jmse.initCause(e);
-            throw jmse;
+            final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue);
+            jmsException.setLinkedException(e);
+            throw jmsException;
         }
     }
 
-    public String getText() throws JMSException
+    @Override
+    public void clearBody() throws JMSException
     {
-        if (_data == null && _decodedValue == null)
-        {
-            return null;
-        }
-        else if (_decodedValue != null)
-        {
-            return _decodedValue;
-        }
-        else
-        {
-            _data.rewind();
+        super.clearBody();
+        _decodedValue = null;
+        _exception = null;
+    }
 
-            if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
-            {
-                return null;
-            }
-            if (getEncoding() != null)
-            {
-                try
-                {
-                    _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
-                }
-                catch (CharacterCodingException e)
-                {
-                    JMSException jmse = new JMSException("Could not decode string data: " + e);
-                    jmse.setLinkedException(e);
-                    jmse.initCause(e);
-                    throw jmse;
-                }
-            }
-            else
-            {
-                try
-                {
-                    _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
-                }
-                catch (CharacterCodingException e)
-                {
-                    JMSException jmse = new JMSException("Could not decode string data: " + e);
-                    jmse.setLinkedException(e);
-                    jmse.initCause(e);
-                    throw jmse;
-                }
-            }
-            return _decodedValue;
-        }
+    public void setText(String text) throws JMSException
+    {
+        checkWritable();
+
+        clearBody();
+        _decodedValue = text;
+
+    }
+
+    public String getText() throws JMSException
+    {
+        return _decodedValue;
     }
 
     @Override
     public void prepareForSending() throws JMSException
     {
         super.prepareForSending();
-        if (_data == null)
+        if (_decodedValue == null)
         {
             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
         }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Fri Oct 21 14:42:12 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Fri Oct 21 14:42:12 2011
@@ -104,7 +104,7 @@ public class MessageFactoryRegistry
                                             AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
             throws AMQException, JMSException
     {
-        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
 
         // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
         // AMQP. When the type is null, it can only be assumed that the message is a byte message.

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Oct 21 14:42:12 2011
@@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
     public void receiveBody(ContentBody body)
     {
 
-        if (body.payload != null)
+        if (body._payload != null)
         {
-            final long payloadSize = body.payload.remaining();
+            final long payloadSize = body._payload.length;
 
             if (_bodies == null)
             {

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java Fri Oct 21 14:42:12 2011
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQDestination.Binding;
+import org.apache.qpid.client.messaging.address.Link.Reliability;
 import org.apache.qpid.client.messaging.address.Link.Subscription;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
 import org.apache.qpid.client.messaging.address.Node.QueueNode;
@@ -54,7 +55,7 @@ public class AddressHelper
     public static final String EXCLUSIVE = "exclusive";
     public static final String AUTO_DELETE = "auto-delete";
     public static final String TYPE = "type";
-    public static final String ALT_EXCHANGE = "alt-exchange";
+    public static final String ALT_EXCHANGE = "alternate-exchange";
     public static final String BINDINGS = "bindings";
     public static final String BROWSE = "browse";
     public static final String MODE = "mode";
@@ -231,14 +232,9 @@ public class AddressHelper
     
     private boolean getDurability(Map map)
     {
-        if (map != null && map.get(DURABLE) != null)
-        {
-            return Boolean.parseBoolean((String)map.get(DURABLE));
-        }
-        else
-        {
-            return false;
-        }
+        Accessor access = new MapAccessor(map);
+        Boolean result = access.getBoolean(DURABLE);
+        return (result == null) ? false : result.booleanValue();
     }
 
     /**
@@ -262,7 +258,7 @@ public class AddressHelper
         }
     }
 
-    public Link getLink()
+    public Link getLink() throws Exception
     {
         Link link = new Link();
         link.setSubscription(new Subscription());
@@ -272,6 +268,25 @@ public class AddressHelper
                     : linkProps.getBoolean(DURABLE));
             link.setName(linkProps.getString(NAME));
 
+            String reliability = linkProps.getString(RELIABILITY);
+            if ( reliability != null)
+            {
+                if (reliability.equalsIgnoreCase("unreliable"))
+                {
+                    link.setReliability(Reliability.UNRELIABLE);
+                }
+                else if (reliability.equalsIgnoreCase("at-least-once"))
+                {
+                    link.setReliability(Reliability.AT_LEAST_ONCE);
+                }
+                else
+                {
+                    throw new Exception("The reliability mode '" + 
+                            reliability + "' is not yet supported");
+                }
+                
+            }
+            
             if (((Map) address.getOptions().get(LINK)).get(CAPACITY) instanceof Map)
             {
                 MapAccessor capacityProps = new MapAccessor(

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client.messaging.address;
 
+import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,6 +31,8 @@ public class Link
 { 
     public enum FilterType { SQL92, XQUERY, SUBJECT }
     
+    public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
+    
     protected String name;
     protected String _filter;
     protected FilterType _filterType = FilterType.SUBJECT;
@@ -38,7 +42,18 @@ public class Link
     protected int _producerCapacity = 0;
     protected Node node;
     protected Subscription subscription;
+    protected Reliability reliability = UNSPECIFIED;
     
+    public Reliability getReliability()
+    {
+        return reliability;
+    }
+
+    public void setReliability(Reliability reliability)
+    {
+        this.reliability = reliability;
+    }
+
     public Node getNode()
     {
         return node;

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -28,10 +30,8 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.mina.filter.codec.ProtocolCodecException;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -46,6 +46,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
@@ -57,16 +58,13 @@ import org.apache.qpid.framing.Heartbeat
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.io.IoTransport;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -164,20 +162,22 @@ public class AMQProtocolHandler implemen
     private FailoverException _lastFailoverException;
 
     /** Defines the default timeout to use for synchronous protocol commands. */
-    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
+    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
     private AMQCodecFactory _codecFactory;
-    private Job _readJob;
-    private Job _writeJob;
-    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
-    private NetworkDriver _networkDriver;
+
     private ProtocolVersion _suggestedProtocolVersion;
 
     private long _writtenBytes;
     private long _readBytes;
 
+    private NetworkConnection _network;
+    private Sender<ByteBuffer> _sender;
+
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -189,43 +189,10 @@ public class AMQProtocolHandler implemen
         _protocolSession = new AMQProtocolSession(this, _connection);
         _stateManager = new AMQStateManager(_protocolSession);
         _codecFactory = new AMQCodecFactory(false, _protocolSession);
-        _poolReference.setThreadFactory(new ThreadFactory()
-        {
-
-            public Thread newThread(final Runnable runnable)
-            {
-                try
-                {
-                    return Threading.getThreadFactory().createThread(runnable);
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException("Failed to create thread", e);
-                }
-            }
-        });
-        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
-        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
-        _poolReference.acquireExecutorService();
         _failoverHandler = new FailoverHandler(this);
     }
 
     /**
-     * Called when we want to create a new IoTransport session
-     * @param brokerDetail
-     */
-    public void createIoTransportSession(BrokerDetails brokerDetail)
-    {
-        _protocolSession = new AMQProtocolSession(this, _connection);
-        _stateManager.setProtocolSession(_protocolSession);
-        IoTransport.connect_0_9(getProtocolSession(),
-                                brokerDetail.getHost(),
-                                brokerDetail.getPort(),
-                                brokerDetail.getBooleanProperty(BrokerDetails.OPTIONS_SSL));
-        _protocolSession.init();
-    }
-
-    /**
      * Called when the network connection is closed. This can happen, either because the client explicitly requested
      * that the connection be closed, in which case nothing is done, or because the connection died. In the case
      * where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -315,7 +282,7 @@ public class AMQProtocolHandler implemen
         //  failover:
         HeartbeatDiagnostics.timeout();
         _logger.warn("Timed out while waiting for heartbeat from peer.");
-        _networkDriver.close();
+        _network.close();
     }
 
     public void writerIdle()
@@ -337,22 +304,12 @@ public class AMQProtocolHandler implemen
             {
                 _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
                 // this will attempt failover
-                _networkDriver.close();
+                _network.close();
                 closed();
             }
             else
             {
-
-                if (cause instanceof ProtocolCodecException)
-                {
-                    _logger.info("Protocol Exception caught NOT going to attempt failover as " +
-                                 "cause isn't AMQConnectionClosedException: " + cause, cause);
-
-                    AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
-                    propagateExceptionToAllWaiters(amqe);
-                }
                 _connection.exceptionReceived(cause);
-
             }
 
             // FIXME Need to correctly handle other exceptions. Things like ...
@@ -446,76 +403,63 @@ public class AMQProtocolHandler implemen
 
     public void received(ByteBuffer msg)
     {
+        _readBytes += msg.remaining();
         try
         {
-            _readBytes += msg.remaining();
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
 
-            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+            // Decode buffer
+
+            for (AMQDataBlock message : dataBlocks)
             {
 
-                public void run()
-                {
-                    // Decode buffer
+                    if (PROTOCOL_DEBUG)
+                    {
+                        _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+                    }
 
-                    for (AMQDataBlock message : dataBlocks)
+                    if(message instanceof AMQFrame)
                     {
+                        final boolean debug = _logger.isDebugEnabled();
+                        final long msgNumber = ++_messageReceivedCount;
 
-                        try
-                        {
-                            if (PROTOCOL_DEBUG)
-                            {
-                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
-                            }
-
-                            if(message instanceof AMQFrame)
-                            {
-                                final boolean debug = _logger.isDebugEnabled();
-                                final long msgNumber = ++_messageReceivedCount;
-
-                                if (debug && ((msgNumber % 1000) == 0))
-                                {
-                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-                                }
-
-                                AMQFrame frame = (AMQFrame) message;
-
-                                final AMQBody bodyFrame = frame.getBodyFrame();
-
-                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
-                                bodyFrame.handle(frame.getChannel(), _protocolSession);
-
-                                _connection.bytesReceived(_readBytes);
-                            }
-                            else if (message instanceof ProtocolInitiation)
-                            {
-                                // We get here if the server sends a response to our initial protocol header
-                                // suggesting an alternate ProtocolVersion; the server will then close the
-                                // connection.
-                                ProtocolInitiation protocolInit = (ProtocolInitiation) message;
-                                _suggestedProtocolVersion = protocolInit.checkVersion();
-                                _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-                                
-                                // get round a bug in old versions of qpid whereby the connection is not closed
-                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
-                            }
-                        }
-                        catch (Exception e)
+                        if (debug && ((msgNumber % 1000) == 0))
                         {
-                            _logger.error("Exception processing frame", e);
-                            propagateExceptionToFrameListeners(e);
-                            exception(e);
+                            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
                         }
+
+                        AMQFrame frame = (AMQFrame) message;
+
+                        final AMQBody bodyFrame = frame.getBodyFrame();
+
+                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+                        bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+                        _connection.bytesReceived(_readBytes);
+                    }
+                    else if (message instanceof ProtocolInitiation)
+                    {
+                        // We get here if the server sends a response to our initial protocol header
+                        // suggesting an alternate ProtocolVersion; the server will then close the
+                        // connection.
+                        ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+                        _suggestedProtocolVersion = protocolInit.checkVersion();
+                        _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+
+                        // get round a bug in old versions of qpid whereby the connection is not closed
+                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
                     }
                 }
-            });
         }
         catch (Exception e)
         {
+            _logger.error("Exception processing frame", e);
             propagateExceptionToFrameListeners(e);
             exception(e);
         }
+
+
     }
 
     public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
@@ -570,28 +514,13 @@ public class AMQProtocolHandler implemen
         return getStateManager().createWaiter(states);
     }
 
-    /**
-     * Convenience method that writes a frame to the protocol session. Equivalent to calling
-     * getProtocolSession().write().
-     *
-     * @param frame the frame to write
-     */
-    public void writeFrame(AMQDataBlock frame)
+    public  synchronized void writeFrame(AMQDataBlock frame)
     {
-        writeFrame(frame, false);
-    }
-
-    public void writeFrame(AMQDataBlock frame, boolean wait)
-    {
-        final ByteBuffer buf = frame.toNioByteBuffer();
+        final ByteBuffer buf = asByteBuffer(frame);
         _writtenBytes += buf.remaining();
-        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
-        {
-            public void run()
-            {
-                _networkDriver.send(buf);
-            }
-        });
+        _sender.send(buf);
+        _sender.flush();
+
         if (PROTOCOL_DEBUG)
         {
             _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
@@ -608,12 +537,41 @@ public class AMQProtocolHandler implemen
 
         _connection.bytesSent(_writtenBytes);
 
-        if (wait)
+    }
+
+    private ByteBuffer asByteBuffer(AMQDataBlock block)
+    {
+        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+        try
         {
-            _networkDriver.flush();
+            block.writePayload(new DataOutputStream(new OutputStream()
+            {
+
+
+                @Override
+                public void write(int b) throws IOException
+                {
+                    buf.put((byte) b);
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException
+                {
+                    buf.put(b, off, len);
+                }
+            }));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
         }
+
+        buf.flip();
+        return buf;
     }
 
+
     /**
      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
      * calling getProtocolSession().write() then waiting for the response.
@@ -707,24 +665,23 @@ public class AMQProtocolHandler implemen
      * <p/>If a failover exception occurs whilst closing the connection it is ignored, as the connection is closed
      * anyway.
      *
-     * @param timeout The timeout to wait for an acknowledgement to the close request.
+     * @param timeout The timeout to wait for an acknowledgment to the close request.
      *
      * @throws AMQException If the close fails for any reason.
      */
     public void closeConnection(long timeout) throws AMQException
     {
-        ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                                                                                                  new AMQShortString("JMS client is closing the connection."), 0, 0);
-
-        final AMQFrame frame = body.generateFrame(0);
-
-        //If the connection is already closed then don't do a syncWrite
         if (!getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED))
         {
+            // Connection is already closed then don't do a syncWrite
             try
             {
+                final ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                        new AMQShortString("JMS client is closing the connection."), 0, 0);
+                final AMQFrame frame = body.generateFrame(0);
+
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
-                _networkDriver.close();
+                _network.close();
                 closed();
             }
             catch (AMQTimeoutException e)
@@ -733,10 +690,9 @@ public class AMQProtocolHandler implemen
             }
             catch (FailoverException e)
             {
-                _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
+                _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
             }
         }
-        _poolReference.releaseExecutorService();
     }
 
     /** @return the number of bytes read from this protocol session */
@@ -844,17 +800,23 @@ public class AMQProtocolHandler implemen
 
     public SocketAddress getRemoteAddress()
     {
-        return _networkDriver.getRemoteAddress();
+        return _network.getRemoteAddress();
     }
 
     public SocketAddress getLocalAddress()
     {
-        return _networkDriver.getLocalAddress();
+        return _network.getLocalAddress();
+    }
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        setNetworkConnection(network, network.getSender());
     }
 
-    public void setNetworkDriver(NetworkDriver driver)
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
     {
-        _networkDriver = driver;
+        _network = network;
+        _sender = sender;
     }
 
     /** @param delay delay in seconds (not ms) */
@@ -862,15 +824,15 @@ public class AMQProtocolHandler implemen
     {
         if (delay > 0)
         {
-            getNetworkDriver().setMaxWriteIdle(delay);
-            getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
+            _network.setMaxWriteIdle(delay);
+            _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay));
             HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
         }
     }
 
-    public NetworkDriver getNetworkDriver()
+    public NetworkConnection getNetworkConnection()
     {
-        return _networkDriver;
+        return _network;
     }
 
     public ProtocolVersion getSuggestedProtocolVersion()

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Oct 21 14:42:12 2011
@@ -20,27 +20,36 @@
  */
 package org.apache.qpid.client.protocol;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.jms.JMSException;
 import javax.security.sasl.SaslClient;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodDispatcher;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.transport.Sender;
-import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
+import org.apache.qpid.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -148,16 +157,6 @@ public class AMQProtocolSession implemen
         return getAMQConnection().getVirtualHost();
     }
 
-    public String getUsername()
-    {
-        return getAMQConnection().getUsername();
-    }
-
-    public String getPassword()
-    {
-        return getAMQConnection().getPassword();
-    }
-
     public SaslClient getSaslClient()
     {
         return _saslClient;
@@ -299,22 +298,11 @@ public class AMQProtocolSession implemen
         return _connection.getSession(channelId);
     }
 
-    /**
-     * Convenience method that writes a frame to the protocol session. Equivalent to calling
-     * getProtocolSession().write().
-     *
-     * @param frame the frame to write
-     */
     public void writeFrame(AMQDataBlock frame)
     {
         _protocolHandler.writeFrame(frame);
     }
 
-    public void writeFrame(AMQDataBlock frame, boolean wait)
-    {
-        _protocolHandler.writeFrame(frame, wait);
-    }
-
     /**
      * Starts the process of closing a session
      *
@@ -375,7 +363,15 @@ public class AMQProtocolSession implemen
 
     public void closeProtocolSession() throws AMQException
     {
-        _protocolHandler.closeConnection(0);
+        try
+        {
+            _protocolHandler.getNetworkConnection().close();
+        }
+        catch(TransportException e)
+        {
+            //ignore such exceptions, they were already logged
+            //and this is a forcible close.
+        }
     }
 
     public void failover(String host, int port)

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/AMQCallbackHandler.java Fri Oct 21 14:42:12 2011
@@ -22,9 +22,9 @@ package org.apache.qpid.client.security;
 
 import javax.security.auth.callback.CallbackHandler;
 
-import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionURL;
 
 public interface AMQCallbackHandler extends CallbackHandler
 {
-    void initialise(AMQProtocolSession protocolSession);    
+    void initialise(ConnectionURL connectionURL);    
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Fri Oct 21 14:42:12 2011
@@ -20,17 +20,22 @@
  */
 package org.apache.qpid.client.security;
 
-import org.apache.qpid.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
@@ -42,7 +47,7 @@ import java.util.Properties;
  * "amp.callbackhandler.properties". The format of the properties file is:
  *
  * <p/><pre>
- * CallbackHanlder.mechanism=fully.qualified.class.name
+ * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
  * </pre>
  *
  * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
@@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
     public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
 
     /** A static reference to the singleton instance of this registry. */
-    private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+    private static final CallbackHandlerRegistry _instance;
 
     /** Holds a map from SASL mechanism names to call back handlers. */
-    private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
-
-    /** Holds a space delimited list of mechanisms that callback handlers exist for. */
-    private String _mechanisms;
-
-    /**
-     * Gets the singleton instance of this registry.
-     *
-     * @return The singleton instance of this registry.
-     */
-    public static CallbackHandlerRegistry getInstance()
-    {
-        return _instance;
-    }
+    private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
 
-    /**
-     * Gets the callback handler class for a given SASL mechanism name.
-     *
-     * @param mechanism The SASL mechanism name.
-     *
-     * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
-     */
-    public Class getCallbackHandlerClass(String mechanism)
-    {
-        return (Class) _mechanismToHandlerClassMap.get(mechanism);
-    }
+    /** Ordered collection of mechanisms for which callback handlers exist. */
+    private Collection<String> _mechanisms;
 
-    /**
-     * Gets a space delimited list of supported SASL mechanisms.
-     *
-     * @return A space delimited list of supported SASL mechanisms.
-     */
-    public String getMechanisms()
-    {
-        return _mechanisms;
-    }
-
-    /**
-     * Creates the call back handler registry from its configuration resource or file. This also has the side effect
-     * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
-     */
-    private CallbackHandlerRegistry()
+    static
     {
         // Register any configured SASL client factories.
         DynamicSaslRegistrar.registerSaslProviders();
@@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
             FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
                 CallbackHandlerRegistry.class.getClassLoader());
 
+        final Properties props = new Properties();
+
         try
         {
-            Properties props = new Properties();
+
             props.load(is);
-            parseProperties(props);
-            _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
         }
         catch (IOException e)
         {
@@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
                 }
             }
         }
+
+        _instance = new CallbackHandlerRegistry(props);
+        _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
+
     }
 
-    /*private InputStream openPropertiesInputStream(String filename)
+    /**
+     * Gets the singleton instance of this registry.
+     *
+     * @return The singleton instance of this registry.
+     */
+    public static CallbackHandlerRegistry getInstance()
+    {
+        return _instance;
+    }
+
+    public AMQCallbackHandler createCallbackHandler(final String mechanism)
     {
-        boolean useDefault = true;
-        InputStream is = null;
-        if (filename != null)
+        final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
+
+        if (mechanismClass == null)
         {
-            try
-            {
-                is = new BufferedInputStream(new FileInputStream(new File(filename)));
-                useDefault = false;
-            }
-            catch (FileNotFoundException e)
-            {
-                _logger.error("Unable to read from file " + filename + ": " + e, e);
-            }
+            throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
         }
 
-        if (useDefault)
+        try
+        {
+            return mechanismClass.newInstance();
+        }
+        catch (InstantiationException e)
+        {
+            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
+        }
+        catch (IllegalAccessException e)
         {
-            is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
+            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
         }
+    }
 
-        return is;
-    }*/
+    /**
+     * Gets collections of supported SASL mechanism names, ordered by preference
+     *
+     * @return collection of SASL mechanism names.
+     */
+    public Collection<String> getMechanisms()
+    {
+        return Collections.unmodifiableCollection(_mechanisms);
+    }
+
+    /**
+     * Creates the call back handler registry from its configuration resource or file.
+     *
+     * This also has the side effect of configuring and registering the SASL client factory
+     * implementations using {@link DynamicSaslRegistrar}.
+     *
+     * This constructor is default protection to allow for effective unit testing.  Clients must use
+     * {@link #getInstance()} to obtain the singleton instance.
+     */
+    CallbackHandlerRegistry(final Properties props)
+    {
+        parseProperties(props);
+    }
 
     /**
      * Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
@@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
      */
     private void parseProperties(Properties props)
     {
+
+        final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
+
         Enumeration e = props.propertyNames();
         while (e.hasMoreElements())
         {
-            String propertyName = (String) e.nextElement();
-            int period = propertyName.indexOf(".");
-            if (period < 0)
-            {
-                _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
+            final String propertyName = (String) e.nextElement();
+            final String[] parts = propertyName.split("\\.", 2);
 
-                continue;
-            }
+            checkPropertyNameFormat(propertyName, parts);
 
-            String mechanism = propertyName.substring(period + 1);
-            String className = props.getProperty(propertyName);
+            final String mechanism = parts[0];
+            final int ordinal = getPropertyOrdinal(propertyName, parts);
+            final String className = props.getProperty(propertyName);
             Class clazz = null;
             try
             {
@@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
                 {
                     _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
                         + ". Skipping");
-
                     continue;
                 }
-
                 _mechanismToHandlerClassMap.put(mechanism, clazz);
-                if (_mechanisms == null)
-                {
-                    _mechanisms = mechanism;
-                }
-                else
-                {
-                    // one time cost
-                    _mechanisms = _mechanisms + " " + mechanism;
-                }
+
+                mechanisms.put(ordinal, mechanism);
             }
             catch (ClassNotFoundException ex)
             {
@@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
                 continue;
             }
         }
+
+        _mechanisms = mechanisms.values();  // order guaranteed by keys of treemap (i.e. our ordinals)
+
+
+    }
+
+    private void checkPropertyNameFormat(final String propertyName, final String[] parts)
+    {
+        if (parts.length != 2)
+        {
+            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
+        }
+    }
+
+    private int getPropertyOrdinal(final String propertyName, final String[] parts)
+    {
+        try
+        {
+            return Integer.parseInt(parts[1]);
+        }
+        catch(NumberFormatException nfe)
+        {
+            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
+        }
+    }
+
+    /**
+     * Selects a SASL mechanism that is mutually available to both parties.  If more than one
+     * mechanism is mutually available the one appearing first (by ordinal) will be returned.
+     *
+     * @param peerMechanismList space separated list of mechanisms
+     * @return selected mechanism, or null if none available
+     */
+    public String selectMechanism(final String peerMechanismList)
+    {
+        final Set<String> peerList = mechListToSet(peerMechanismList);
+
+        return selectMechInternal(peerList, Collections.<String>emptySet());
+    }
+
+    /**
+     * Selects a SASL mechanism that is mutually available to both parties.
+     *
+     * @param peerMechanismList space separated list of mechanisms
+     * @param restrictionList space separated list of mechanisms
+     * @return selected mechanism, or null if none available
+     */
+    public String selectMechanism(final String peerMechanismList, final String restrictionList)
+    {
+        final Set<String> peerList = mechListToSet(peerMechanismList);
+        final Set<String> restrictionSet = mechListToSet(restrictionList);
+
+        return selectMechInternal(peerList, restrictionSet);
+    }
+
+    private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
+    {
+        for (final String mech : _mechanisms)
+        {
+            if (peerSet.contains(mech))
+            {
+                if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
+                {
+                    return mech;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private Set<String> mechListToSet(final String mechanismList)
+    {
+        if (mechanismList == null)
+        {
+            return Collections.emptySet();
+        }
+
+        final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
+        final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
+        while (tokenizer.hasMoreTokens())
+        {
+            mechanismSet.add(tokenizer.nextToken());
+        }
+        return Collections.unmodifiableSet(mechanismSet);
     }
+
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Fri Oct 21 14:42:12 2011
@@ -16,7 +16,17 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
-CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+
+#
+# Format:
+# <mechanism name>.ordinal=<implementation>
+#
+# @see CallbackHandlerRegistry
+#
+
+EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
+CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/DynamicSaslRegistrar.properties Fri Oct 21 14:42:12 2011
@@ -18,3 +18,4 @@
 #
 AMQPLAIN=org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory
 CRAM-MD5-HASHED=org.apache.qpid.client.security.crammd5hashed.CRAMMD5HashedSaslClientFactory
+ANONYMOUS=org.apache.qpid.client.security.anonymous.AnonymousSaslClientFactory

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernameHashedPasswordCallbackHandler.java Fri Oct 21 14:42:12 2011
@@ -20,30 +20,29 @@
  */
 package org.apache.qpid.client.security;
 
-import org.apache.qpid.client.protocol.AMQProtocolSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
+import org.apache.qpid.jms.ConnectionURL;
 
 public class UsernameHashedPasswordCallbackHandler implements AMQCallbackHandler
 {
-    private static final Logger _logger = LoggerFactory.getLogger(UsernameHashedPasswordCallbackHandler.class);
+    private ConnectionURL _connectionURL;
 
-    private AMQProtocolSession _protocolSession;
-
-    public void initialise(AMQProtocolSession protocolSession)
+    /**
+     * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL)
+     */
+    @Override
+    public void initialise(ConnectionURL connectionURL)
     {
-        _protocolSession = protocolSession;
+        _connectionURL = connectionURL;
     }
 
     public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
@@ -53,13 +52,13 @@ public class UsernameHashedPasswordCallb
             Callback cb = callbacks[i];
             if (cb instanceof NameCallback)
             {
-                ((NameCallback) cb).setName(_protocolSession.getUsername());
+                ((NameCallback) cb).setName(_connectionURL.getUsername());
             }
             else if (cb instanceof PasswordCallback)
             {
                 try
                 {
-                    ((PasswordCallback) cb).setPassword(getHash(_protocolSession.getPassword()));
+                    ((PasswordCallback) cb).setPassword(getHash(_connectionURL.getPassword()));
                 }
                 catch (NoSuchAlgorithmException e)
                 {
@@ -99,4 +98,5 @@ public class UsernameHashedPasswordCallb
 
         return hash;
     }
+
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/security/UsernamePasswordCallbackHandler.java Fri Oct 21 14:42:12 2011
@@ -27,15 +27,19 @@ import javax.security.auth.callback.Name
 import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 
-import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.jms.ConnectionURL;
 
 public class UsernamePasswordCallbackHandler implements AMQCallbackHandler
 {
-    private AMQProtocolSession _protocolSession;
+    private ConnectionURL _connectionURL;
 
-    public void initialise(AMQProtocolSession protocolSession)
+    /**
+     * @see org.apache.qpid.client.security.AMQCallbackHandler#initialise(org.apache.qpid.jms.ConnectionURL)
+     */
+    @Override
+    public void initialise(final ConnectionURL connectionURL)
     {
-        _protocolSession = protocolSession;
+        _connectionURL = connectionURL;
     }
 
     public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
@@ -45,11 +49,11 @@ public class UsernamePasswordCallbackHan
             Callback cb = callbacks[i];
             if (cb instanceof NameCallback)
             {
-                ((NameCallback)cb).setName(_protocolSession.getUsername());
+                ((NameCallback)cb).setName(_connectionURL.getUsername());
             }
             else if (cb instanceof PasswordCallback)
             {
-                ((PasswordCallback)cb).setPassword(_protocolSession.getPassword().toCharArray());
+                ((PasswordCallback)cb).setPassword(_connectionURL.getPassword().toCharArray());
             }
             else
             {
@@ -57,4 +61,5 @@ public class UsernamePasswordCallbackHan
             }
         }
     }
+
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Oct 21 14:42:12 2011
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Set;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.io.IOException;
 
 /**
  * The state manager is responsible for managing the state of the protocol session. <p/>
@@ -48,7 +47,7 @@ import java.io.IOException;
  *
  * The two step process is required as there is an inherit race condition between starting a process that will cause
  * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
- * that any asynchrous errors that occur can be delivered to the correct waiters.
+ * that any asynchronous errors that occur can be delivered to the correct waiters.
  */
 public class AMQStateManager implements AMQMethodListener
 {
@@ -84,7 +83,10 @@ public class AMQStateManager implements 
 
     public AMQState getCurrentState()
     {
-        return _currentState;
+        synchronized (_stateLock)
+        {
+            return _currentState;
+        }
     }
 
     public void changeState(AMQState newState)
@@ -114,7 +116,7 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+     * Setting of the ProtocolSession will be required when Failover has been successfully completed.
      *
      * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
      * connection to the network.
@@ -131,9 +133,9 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Propogate error to waiters
+     * Propagate error to waiters
      *
-     * @param error The error to propogate.
+     * @param error The error to propagate.
      */
     public void error(Exception error)
     {
@@ -177,7 +179,7 @@ public class AMQStateManager implements 
     }
 
     /**
-     * Create and add a new waiter to the notifcation list.
+     * Create and add a new waiter to the notification list.
      *
      * @param states The waiter will attempt to wait for one of these desired set states to be achived.
      *

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Fri Oct 21 14:42:12 2011
@@ -34,7 +34,7 @@ import java.util.Set;
  *
  * On construction the current state and a set of States to await for is provided.
  *
- * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * When await() is called the state at construction is compared against the awaitStates. If the state at construction is
  * a desired state then await() returns immediately.
  *
  * Otherwise it will block for the set timeout for a desired state to be achieved.
@@ -48,9 +48,9 @@ public class StateWaiter extends Blockin
 {
     private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
 
-    Set<AMQState> _awaitStates;
-    private AMQState _startState;
-    private AMQStateManager _stateManager;
+    private final Set<AMQState> _awaitStates;
+    private final AMQState _startState;
+    private final AMQStateManager _stateManager;
 
     /**
      *
@@ -78,9 +78,9 @@ public class StateWaiter extends Blockin
     }
 
     /**
-     * Await for the requried State to be achieved within the default timeout.
+     * Await for the required State to be achieved within the default timeout.
      * @return The achieved state that was requested.
-     * @throws AMQException The exception that prevented the required state from being achived.
+     * @throws AMQException The exception that prevented the required state from being achieved.
      */
     public AMQState await() throws AMQException
     {
@@ -88,13 +88,13 @@ public class StateWaiter extends Blockin
     }
 
     /**
-     * Await for the requried State to be achieved.
+     * Await for the required State to be achieved.
      *
      * <b>It is the responsibility of this class to remove the waiter from the StateManager
      *
-     * @param timeout The time in milliseconds to wait for any of the states to be achived.
+     * @param timeout The time in milliseconds to wait for any of the states to be achieved.
      * @return The achieved state that was requested.
-     * @throws AMQException The exception that prevented the required state from being achived.
+     * @throws AMQException The exception that prevented the required state from being achieved.
      */
     public AMQState await(long timeout) throws AMQException
     {

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/url/URLParser.java Fri Oct 21 14:42:12 2011
@@ -45,7 +45,7 @@ public class URLParser
     private void parseURL(String fullURL) throws URLSyntaxException
     {
         // Connection URL format
-        // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';vm://:3/virtualpath?option=\'value\'',failover='method?option=\'value\',option='value''"
+        // amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\',option=\'value\';tcp://host:port?option=\'value\'',failover='method?option=\'value\',option='value''"
         // Options are of course optional except for requiring a single broker in the broker list.
         try
         {
@@ -195,7 +195,7 @@ public class URLParser
         {
             String brokerlist = _url.getOptions().get(AMQConnectionURL.OPTIONS_BROKERLIST);
 
-            // brokerlist tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'
+            // brokerlist tcp://host:port?option='value',option='value';tcp://host:port/virtualpath?option='value'
             StringTokenizer st = new StringTokenizer(brokerlist, "" + URLHelper.BROKER_SEPARATOR);
 
             while (st.hasMoreTokens())

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java Fri Oct 21 14:42:12 2011
@@ -28,9 +28,8 @@ import java.util.concurrent.locks.Reentr
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * BlockingWaiter is a 'rendezvous' which delegates handling of
@@ -64,6 +63,8 @@ import org.apache.qpid.protocol.AMQMetho
  */
 public abstract class BlockingWaiter<T>
 {
+    private static final Logger _logger = LoggerFactory.getLogger(BlockingWaiter.class);
+
     /** This flag is used to indicate that the blocked for method has been received. */
     private volatile boolean _ready = false;
 
@@ -180,7 +181,7 @@ public abstract class BlockingWaiter<T>
                     }
                     catch (InterruptedException e)
                     {
-                        System.err.println(e.getMessage());
+                        _logger.error(e.getMessage(), e);
                         // IGNORE    -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
                         // if (!_ready && timeout != -1)
                         // {
@@ -228,12 +229,12 @@ public abstract class BlockingWaiter<T>
     }
 
     /**
-     * This is a callback, called when an error has occured that should interupt any waiter.
+     * This is a callback, called when an error has occurred that should interrupt any waiter.
      * It is also called from within this class to avoid code repetition but it should only be called by the MINA threads.
      *
      * Once closed any notification of an exception will be ignored.
      *
-     * @param e The exception being propogated.
+     * @param e The exception being propagated.
      */
     public void error(Exception e)
     {
@@ -255,7 +256,7 @@ public abstract class BlockingWaiter<T>
             }
             else
             {
-                System.err.println("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
+                _logger.error("WARNING: new error '" + e == null ? "null" : e.getMessage() + "' arrived while old one not yet processed:" + _error.getMessage());
             }
 
             if (_waiting.get())
@@ -272,7 +273,7 @@ public abstract class BlockingWaiter<T>
                     }
                     catch (InterruptedException e1)
                     {
-                        System.err.println(e.getMessage());
+                        _logger.error(e1.getMessage(), e1);
                     }
                 }
                 _errorAck = false;

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Fri Oct 21 14:42:12 2011
@@ -37,9 +37,9 @@ public class JMSSelectorFilter implement
     public JMSSelectorFilter(String selector) throws AMQInternalException
     {
         _selector = selector;
-        if (JMSSelectorFilter._logger.isDebugEnabled())
+        if (_logger.isDebugEnabled())
         {
-            JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+            _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
         }
         _matcher = new SelectorParser().parse(selector);
     }
@@ -49,16 +49,16 @@ public class JMSSelectorFilter implement
         try
         {
             boolean match = _matcher.matches(message);
-            if (JMSSelectorFilter._logger.isDebugEnabled())
+            if (_logger.isDebugEnabled())
             {
-                JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
+                _logger.debug(message + " match(" + match + ") selector(" + System
                         .identityHashCode(_selector) + "):" + _selector);
             }
             return match;
         }
         catch (AMQInternalException e)
         {
-            JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message  " + message, e);
+            _logger.warn("Caught exception when evaluating message selector for message  " + message, e);
         }
         return false;
     }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/filter/PropertyExpression.java Fri Oct 21 14:42:12 2011
@@ -19,6 +19,7 @@ package org.apache.qpid.filter;
 
 import java.util.HashMap;
 
+import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 
 import org.apache.qpid.AMQInternalException;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
 public class PropertyExpression implements Expression
 {
     // Constants - defined the same as JMS
-    private static final int NON_PERSISTENT = 1;
+    private static enum JMSDeliveryMode { NON_PERSISTENT, PERSISTENT }
     private static final int DEFAULT_PRIORITY = 4;
 
     private static final Logger _logger = LoggerFactory.getLogger(PropertyExpression.class);
@@ -79,22 +80,24 @@ public class PropertyExpression implemen
                                      {
                                          public Object evaluate(AbstractJMSMessage message)
                                          {
+
+                                             JMSDeliveryMode mode = JMSDeliveryMode.NON_PERSISTENT;
                                              try
                                              {
-                                                 int mode = message.getJMSDeliveryMode();
+                                                 mode = message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ?
+                                                         JMSDeliveryMode.PERSISTENT : JMSDeliveryMode.NON_PERSISTENT;
+
                                                  if (_logger.isDebugEnabled())
                                                  {
                                                      _logger.debug("JMSDeliveryMode is :" + mode);
                                                  }
-
-                                                 return mode;
                                              }
                                              catch (JMSException e)
                                              {
                                                  _logger.warn("Error evaluating property",e);
                                              }
 
-                                             return NON_PERSISTENT;
+                                             return mode.toString();
                                          }
                                      });
 

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Oct 21 14:42:12 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.jms;
 
 import java.util.Map;
 
-import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.transport.ConnectionSettings;
 
 public interface BrokerDetails
 {
@@ -52,9 +52,7 @@ public interface BrokerDetails
     
     public static final int DEFAULT_PORT = 5672;
 
-    public static final String SOCKET = "socket";
     public static final String TCP = "tcp";
-    public static final String VM = "vm";
 
     public static final String DEFAULT_TRANSPORT = TCP;
 
@@ -106,14 +104,12 @@ public interface BrokerDetails
     long getTimeout();
 
     void setTimeout(long timeout);
-
-    SSLConfiguration getSSLConfiguration();
-
-    void setSSLConfiguration(SSLConfiguration sslConfiguration);
     
     boolean getBooleanProperty(String propName);
 
     String toString();
 
     boolean equals(Object o);
+
+    ConnectionSettings buildConnectionSettings();
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Oct 21 14:42:12 2011
@@ -27,7 +27,7 @@ import java.util.List;
 
 /**
  Connection URL format
- amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';vm://:3/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
+ amqp://[user:pass@][clientid]/virtualhost?brokerlist='tcp://host:port?option=\'value\'&option=\'value\';tcp://host:port/virtualpath?option=\'value\''&failover='method?option=\'value\'&option='value''"
  Options are of course optional except for requiring a single broker in the broker list.
  The option seperator is defined to be either '&' or ','
   */

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java Fri Oct 21 14:42:12 2011
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.jms;
 
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.jms.failover.FailoverExchangeMethod;
 import org.apache.qpid.jms.failover.FailoverMethod;
 import org.apache.qpid.jms.failover.FailoverRoundRobinServers;
@@ -51,7 +50,7 @@ public class FailoverPolicy
     private long _lastMethodTime;
     private long _lastFailTime;
 
-    public FailoverPolicy(ConnectionURL connectionDetails, AMQConnection conn)
+    public FailoverPolicy(ConnectionURL connectionDetails, Connection conn)
     {
         FailoverMethod method;
 
@@ -83,7 +82,7 @@ public class FailoverPolicy
              */
             if (failoverMethod.equals(FailoverMethod.SINGLE_BROKER))
             {
-                method = new FailoverRoundRobinServers(connectionDetails);
+                method = new FailoverSingleServer(connectionDetails);
             }
             else
             {

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/MessageProducer.java Fri Oct 21 14:42:12 2011
@@ -51,7 +51,4 @@ public interface MessageProducer extends
                      int priority, long timeToLive, boolean mandatory, boolean immediate)
             throws JMSException;
 
-    void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException;
-
 }

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Fri Oct 21 14:42:12 2011
@@ -32,9 +32,9 @@ import javax.jms.Session;
 
 import org.apache.qpid.client.AMQAnyDestination;
 import org.apache.qpid.client.AMQBrokerDetails;
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
 import org.apache.qpid.jms.ConnectionURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +58,7 @@ public class FailoverExchangeMethod impl
     private static final Logger _logger = LoggerFactory.getLogger(FailoverExchangeMethod.class);
    
     /** This is not safe to use until attainConnection is called */
-    private AMQConnection _conn;
+    private Connection _conn;
     
     /** Protects the broker list when modifications happens */
     private Object _brokerListLock = new Object();
@@ -80,7 +80,7 @@ public class FailoverExchangeMethod impl
     /** Denotes the number of failed attempts **/
     private int _failedAttemps = 0;
     
-    public FailoverExchangeMethod(ConnectionURL connectionDetails, AMQConnection conn)
+    public FailoverExchangeMethod(ConnectionURL connectionDetails, Connection conn)
     {
         _connectionDetails = connectionDetails;
         _originalBrokerDetail = _connectionDetails.getBrokerDetails(0);
@@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
                         broker.setHost(tokens[1]);
                         broker.setPort(Integer.parseInt(tokens[2]));
                         broker.setProperties(_originalBrokerDetail.getProperties());
-                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
                         brokerList.add(broker);
                         
                         if (currentBrokerIP.equals(broker.getHost()) && 

Modified: qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ qpid/branches/QPID-2519/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Oct 21 14:42:12 2011
@@ -36,6 +36,7 @@ import javax.jms.Queue;
 import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.NamingException;
+import javax.naming.ConfigurationException;
 import javax.naming.spi.InitialContextFactory;
 
 import org.apache.qpid.client.AMQConnectionFactory;
@@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
         return new ReadOnlyContext(environment, data);
     }
 
-    protected void createConnectionFactories(Map data, Hashtable environment)
+    protected void createConnectionFactories(Map data, Hashtable environment) throws ConfigurationException
     {
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
         {
@@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
         }
     }
 
-    protected void createDestinations(Map data, Hashtable environment)
+    protected void createDestinations(Map data, Hashtable environment) throws ConfigurationException
     {
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
         {
@@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
     /**
      * Factory method to create new Connection Factory instances
      */
-    protected ConnectionFactory createFactory(String url)
+    protected ConnectionFactory createFactory(String url) throws ConfigurationException
     {
         try
         {
@@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
         }
         catch (URLSyntaxException urlse)
         {
-            _logger.warn("Unable to createFactories:" + urlse);
-        }
+            _logger.warn("Unable to create factory:" + urlse);
 
-        return null;
+            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + urlse + " due to : " +  urlse.getMessage());
+            ex.initCause(urlse);
+            throw ex;
+        }
     }
 
     /**
      * Factory method to create new Destination instances from an AMQP BindingURL
      */
-    protected Destination createDestination(String str)
+    protected Destination createDestination(String str) throws ConfigurationException
     {
         try
         {
@@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
         {
             _logger.warn("Unable to create destination:" + e, e);
 
-            return null;
+            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + str + " due to : " +  e.getMessage());
+            ex.initCause(e);
+            throw ex;
         }
     }
 

Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Fri Oct 21 14:42:12 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.url.URLSyntaxException;
 
@@ -37,53 +36,18 @@ public class MockAMQConnection extends A
         super(broker, username, password, clientName, virtualHost);
     }
 
-    public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(broker, username, password, clientName, virtualHost, sslConfig);
-    }
-
     public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
             throws AMQException, URLSyntaxException
     {
         super(host, port, username, password, clientName, virtualHost);
     }
 
-    public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(host, port, username, password, clientName, virtualHost, sslConfig);
-    }
-
-    public MockAMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(host, port, useSSL, username, password, clientName, virtualHost, sslConfig);
-    }
-
     public MockAMQConnection(String connection)
             throws AMQException, URLSyntaxException
     {
         super(connection);
     }
 
-    public MockAMQConnection(String connection, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(connection, sslConfig);
-    }
-
-    public MockAMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig)
-            throws AMQException
-    {
-        super(connectionURL, sslConfig);
-    }
-
-    protected MockAMQConnection(String username, String password, String clientName, String virtualHost)
-    {
-        super(username, password, clientName, virtualHost);
-    }
-
     @Override
     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
     {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org