You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC

svn commit: r1172657 [14/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/csha...

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Mon Sep 19 15:13:18 2011
@@ -23,7 +23,8 @@ package org.apache.qpid.client.message;
 import javax.jms.JMSException;
 import javax.jms.StreamMessage;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -36,65 +37,76 @@ public class JMSStreamMessage extends Ab
     public static final String MIME_TYPE="jms/stream-message";
 
 
-
-    /**
-     * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
-     * a byte array in multiple chunks, hence this is used to track how much is left to be read
-     */
-    private int _byteArrayRemaining = -1;
+    private TypedBytesContentReader _typedBytesContentReader;
+    private TypedBytesContentWriter _typedBytesContentWriter;
 
     public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(delegateFactory,null);
+        super(delegateFactory,false);
+        _typedBytesContentWriter = new TypedBytesContentWriter();
 
     }
 
-    /**
-     * Construct a stream message with existing data.
-     *
-     * @param delegateFactory
-     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     */
-    JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
-    {
 
-        super(delegateFactory, data); // this instanties a content header
-    }
 
     JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-
-        super(delegate, data);
+        super(delegate, data!=null);
+        _typedBytesContentReader = new TypedBytesContentReader(data);
     }
 
-
     public void reset()
     {
-        super.reset();
         _readableMessage = true;
+
+        if(_typedBytesContentReader != null)
+        {
+            _typedBytesContentReader.reset();
+        }
+        else if (_typedBytesContentWriter != null)
+        {
+            _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
+        }
+    }
+
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _typedBytesContentReader = null;
+        _typedBytesContentWriter = new TypedBytesContentWriter();
+
     }
 
+
     protected String getMimeType()
     {
         return MIME_TYPE;
     }
 
-
+    @Override
+    public java.nio.ByteBuffer getData() throws JMSException
+    {
+        return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
+    }
 
     public boolean readBoolean() throws JMSException
     {
-        return super.readBoolean();
+        checkReadable();
+        return _typedBytesContentReader.readBoolean();
     }
 
 
     public byte readByte() throws JMSException
     {
-        return super.readByte();
+        checkReadable();
+        return _typedBytesContentReader.readByte();
     }
 
     public short readShort() throws JMSException
     {
-        return super.readShort();
+        checkReadable();
+        return _typedBytesContentReader.readShort();
     }
 
     /**
@@ -105,102 +117,127 @@ public class JMSStreamMessage extends Ab
      */
     public char readChar() throws JMSException
     {
-        return super.readChar();
+        checkReadable();
+        return _typedBytesContentReader.readChar();
     }
 
     public int readInt() throws JMSException
     {
-        return super.readInt();
+        checkReadable();
+        return _typedBytesContentReader.readInt();
     }
 
     public long readLong() throws JMSException
     {
-        return super.readLong();
+        checkReadable();
+        return _typedBytesContentReader.readLong();
     }
 
     public float readFloat() throws JMSException
     {
-        return super.readFloat();
+        checkReadable();
+        return _typedBytesContentReader.readFloat();
     }
 
     public double readDouble() throws JMSException
     {
-        return super.readDouble();
+        checkReadable();
+        return _typedBytesContentReader.readDouble();
     }
 
     public String readString() throws JMSException
     {
-        return super.readString();
+        checkReadable();
+        return _typedBytesContentReader.readString();
     }
 
     public int readBytes(byte[] bytes) throws JMSException
     {
-        return super.readBytes(bytes);
+        if(bytes == null)
+        {
+            throw new IllegalArgumentException("Must provide non-null array to read into");
+        }
+
+        checkReadable();
+        return _typedBytesContentReader.readBytes(bytes);
     }
 
 
     public Object readObject() throws JMSException
     {
-        return super.readObject();
+        checkReadable();
+        return _typedBytesContentReader.readObject();
     }
 
     public void writeBoolean(boolean b) throws JMSException
     {
-        super.writeBoolean(b);
+        checkWritable();
+        _typedBytesContentWriter.writeBoolean(b);
     }
 
     public void writeByte(byte b) throws JMSException
     {
-        super.writeByte(b);
+        checkWritable();
+        _typedBytesContentWriter.writeByte(b);
     }
 
     public void writeShort(short i) throws JMSException
     {
-        super.writeShort(i);
+        checkWritable();
+        _typedBytesContentWriter.writeShort(i);
     }
 
     public void writeChar(char c) throws JMSException
     {
-        super.writeChar(c);
+        checkWritable();
+        _typedBytesContentWriter.writeChar(c);
     }
 
     public void writeInt(int i) throws JMSException
     {
-        super.writeInt(i);
+        checkWritable();
+        _typedBytesContentWriter.writeInt(i);
     }
 
     public void writeLong(long l) throws JMSException
     {
-        super.writeLong(l);
+        checkWritable();
+        _typedBytesContentWriter.writeLong(l);
     }
 
     public void writeFloat(float v) throws JMSException
     {
-        super.writeFloat(v);
+        checkWritable();
+        _typedBytesContentWriter.writeFloat(v);
     }
 
     public void writeDouble(double v) throws JMSException
     {
-        super.writeDouble(v);
+        checkWritable();
+        _typedBytesContentWriter.writeDouble(v);
     }
 
     public void writeString(String string) throws JMSException
     {
-        super.writeString(string);
+        checkWritable();
+        _typedBytesContentWriter.writeString(string);
     }
 
     public void writeBytes(byte[] bytes) throws JMSException
     {
-        super.writeBytes(bytes);
+        checkWritable();
+        _typedBytesContentWriter.writeBytes(bytes);
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
     {
-        super.writeBytes(bytes,offset,length);
+        checkWritable();
+        _typedBytesContentWriter.writeBytes(bytes, offset, length);
     }
 
     public void writeObject(Object object) throws JMSException
     {
-        super.writeObject(object);
+        checkWritable();
+        _typedBytesContentWriter.writeObject(object);
     }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -22,10 +22,9 @@ package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
 {

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Mon Sep 19 15:13:18 2011
@@ -20,15 +20,21 @@
  */
 package org.apache.qpid.client.message;
 
+import java.io.DataInputStream;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CharsetEncoder;
 
 import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.util.Strings;
@@ -37,6 +43,7 @@ public class JMSTextMessage extends Abst
 {
     private static final String MIME_TYPE = "text/plain";
 
+    private Exception _exception;
     private String _decodedValue;
 
     /**
@@ -45,36 +52,41 @@ public class JMSTextMessage extends Abst
     private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
     private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
-    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
-    {
-        this(delegateFactory, null, null);
-    }
+    private CharsetDecoder _decoder = DEFAULT_CHARSET.newDecoder();
+    private CharsetEncoder _encoder = DEFAULT_CHARSET.newEncoder();
+
+    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
 
-    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
+    public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        super(delegateFactory, data); // this instantiates a content header
-        setContentType(getMimeType());
-        setEncoding(encoding);
+        super(delegateFactory, false); // this instantiates a content header
     }
 
     JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
             throws AMQException
     {
-        super(delegate, data);
-        setContentType(getMimeType());
-        _data = data;
-    }
+        super(delegate, data!=null);
 
-
-    public void clearBodyImpl() throws JMSException
-    {
-        if (_data != null)
+        try
         {
-            _data.release();
-            _data = null;
+            if(propertyExists(PAYLOAD_NULL_PROPERTY))
+            {
+                _decodedValue = null;
+            }
+            else
+            {
+                _decodedValue = _decoder.decode(data).toString();
+            }
+        }
+        catch (CharacterCodingException e)
+        {
+            _exception = e;
+        }
+        catch (JMSException e)
+        {
+            _exception = e;
         }
 
-        _decodedValue = null;
     }
 
     public String toBodyString() throws JMSException
@@ -87,95 +99,62 @@ public class JMSTextMessage extends Abst
         return MIME_TYPE;
     }
 
-    public void setText(String text) throws JMSException
+    @Override
+    public ByteBuffer getData() throws JMSException
     {
-        checkWritable();
-
-        clearBody();
+        _encoder.reset();
         try
         {
-            if (text != null)
+            if(_exception != null)
+            {
+                final MessageFormatException messageFormatException = new MessageFormatException("Cannot decode original message");
+                messageFormatException.setLinkedException(_exception);
+                throw messageFormatException;
+            }
+            else if(_decodedValue == null)
+            {
+                return EMPTY_BYTE_BUFFER;
+            }
+            else
             {
-                final String encoding = getEncoding();
-                if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
-                {
-                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
-                    setEncoding("UTF-8");
-                }
-                else
-                {
-                    _data = ByteBuffer.wrap(text.getBytes(encoding));
-                }
-                _data.position(_data.limit());
-                _changedData=true;
+                return _encoder.encode(CharBuffer.wrap(_decodedValue));
             }
-            _decodedValue = text;
         }
-        catch (UnsupportedEncodingException e)
+        catch (CharacterCodingException e)
         {
-            // should never occur
-            JMSException jmse = new JMSException("Unable to decode text data");
-            jmse.setLinkedException(e);
-            jmse.initCause(e);
-            throw jmse;
+            final JMSException jmsException = new JMSException("Cannot encode string in UFT-8: " + _decodedValue);
+            jmsException.setLinkedException(e);
+            throw jmsException;
         }
     }
 
-    public String getText() throws JMSException
+    @Override
+    public void clearBody() throws JMSException
     {
-        if (_data == null && _decodedValue == null)
-        {
-            return null;
-        }
-        else if (_decodedValue != null)
-        {
-            return _decodedValue;
-        }
-        else
-        {
-            _data.rewind();
+        super.clearBody();
+        _decodedValue = null;
+        _exception = null;
+    }
 
-            if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
-            {
-                return null;
-            }
-            if (getEncoding() != null)
-            {
-                try
-                {
-                    _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
-                }
-                catch (CharacterCodingException e)
-                {
-                    JMSException jmse = new JMSException("Could not decode string data: " + e);
-                    jmse.setLinkedException(e);
-                    jmse.initCause(e);
-                    throw jmse;
-                }
-            }
-            else
-            {
-                try
-                {
-                    _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
-                }
-                catch (CharacterCodingException e)
-                {
-                    JMSException jmse = new JMSException("Could not decode string data: " + e);
-                    jmse.setLinkedException(e);
-                    jmse.initCause(e);
-                    throw jmse;
-                }
-            }
-            return _decodedValue;
-        }
+    public void setText(String text) throws JMSException
+    {
+        checkWritable();
+
+        clearBody();
+        _decodedValue = text;
+
+    }
+
+    public String getText() throws JMSException
+    {
+        return _decodedValue;
     }
 
     @Override
     public void prepareForSending() throws JMSException
     {
         super.prepareForSending();
-        if (_data == null)
+        if (_decodedValue == null)
         {
             setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
         }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.client.message;
 
 import javax.jms.JMSException;
 
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Mon Sep 19 15:13:18 2011
@@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
     public void receiveBody(ContentBody body)
     {
 
-        if (body.payload != null)
+        if (body._payload != null)
         {
-            final long payloadSize = body.payload.remaining();
+            final long payloadSize = body._payload.length;
 
             if (_bodies == null)
             {

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -31,7 +33,6 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.mina.filter.codec.ProtocolCodecException;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -46,6 +47,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
@@ -57,8 +59,6 @@ import org.apache.qpid.framing.Heartbeat
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.pool.Job;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -164,19 +164,19 @@ public class AMQProtocolHandler implemen
     private FailoverException _lastFailoverException;
 
     /** Defines the default timeout to use for synchronous protocol commands. */
-    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong("amqj.default_syncwrite_timeout", 1000 * 30);
+    private final long DEFAULT_SYNC_TIMEOUT = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+                                                           Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+                                                                        ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
     private AMQCodecFactory _codecFactory;
-    private Job _readJob;
-    private Job _writeJob;
-    private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+
     private ProtocolVersion _suggestedProtocolVersion;
 
     private long _writtenBytes;
     private long _readBytes;
-    private NetworkTransport _transport;
+
     private NetworkConnection _network;
     private Sender<ByteBuffer> _sender;
 
@@ -191,24 +191,6 @@ public class AMQProtocolHandler implemen
         _protocolSession = new AMQProtocolSession(this, _connection);
         _stateManager = new AMQStateManager(_protocolSession);
         _codecFactory = new AMQCodecFactory(false, _protocolSession);
-        _poolReference.setThreadFactory(new ThreadFactory()
-        {
-
-            public Thread newThread(final Runnable runnable)
-            {
-                try
-                {
-                    return Threading.getThreadFactory().createThread(runnable);
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException("Failed to create thread", e);
-                }
-            }
-        });
-        _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
-        _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
-        _poolReference.acquireExecutorService();
         _failoverHandler = new FailoverHandler(this);
     }
 
@@ -329,17 +311,7 @@ public class AMQProtocolHandler implemen
             }
             else
             {
-
-                if (cause instanceof ProtocolCodecException)
-                {
-                    _logger.info("Protocol Exception caught NOT going to attempt failover as " +
-                                 "cause isn't AMQConnectionClosedException: " + cause, cause);
-
-                    AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
-                    propagateExceptionToAllWaiters(amqe);
-                }
                 _connection.exceptionReceived(cause);
-
             }
 
             // FIXME Need to correctly handle other exceptions. Things like ...
@@ -433,76 +405,63 @@ public class AMQProtocolHandler implemen
 
     public void received(ByteBuffer msg)
     {
+        _readBytes += msg.remaining();
         try
         {
-            _readBytes += msg.remaining();
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
 
-            Job.fireAsynchEvent(_poolReference.getPool(), _readJob, new Runnable()
+            // Decode buffer
+
+            for (AMQDataBlock message : dataBlocks)
             {
 
-                public void run()
-                {
-                    // Decode buffer
+                    if (PROTOCOL_DEBUG)
+                    {
+                        _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+                    }
 
-                    for (AMQDataBlock message : dataBlocks)
+                    if(message instanceof AMQFrame)
                     {
+                        final boolean debug = _logger.isDebugEnabled();
+                        final long msgNumber = ++_messageReceivedCount;
 
-                        try
-                        {
-                            if (PROTOCOL_DEBUG)
-                            {
-                                _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
-                            }
-
-                            if(message instanceof AMQFrame)
-                            {
-                                final boolean debug = _logger.isDebugEnabled();
-                                final long msgNumber = ++_messageReceivedCount;
-
-                                if (debug && ((msgNumber % 1000) == 0))
-                                {
-                                    _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-                                }
-
-                                AMQFrame frame = (AMQFrame) message;
-
-                                final AMQBody bodyFrame = frame.getBodyFrame();
-
-                                HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
-
-                                bodyFrame.handle(frame.getChannel(), _protocolSession);
-
-                                _connection.bytesReceived(_readBytes);
-                            }
-                            else if (message instanceof ProtocolInitiation)
-                            {
-                                // We get here if the server sends a response to our initial protocol header
-                                // suggesting an alternate ProtocolVersion; the server will then close the
-                                // connection.
-                                ProtocolInitiation protocolInit = (ProtocolInitiation) message;
-                                _suggestedProtocolVersion = protocolInit.checkVersion();
-                                _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
-                                
-                                // get round a bug in old versions of qpid whereby the connection is not closed
-                                _stateManager.changeState(AMQState.CONNECTION_CLOSED);
-                            }
-                        }
-                        catch (Exception e)
+                        if (debug && ((msgNumber % 1000) == 0))
                         {
-                            _logger.error("Exception processing frame", e);
-                            propagateExceptionToFrameListeners(e);
-                            exception(e);
+                            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
                         }
+
+                        AMQFrame frame = (AMQFrame) message;
+
+                        final AMQBody bodyFrame = frame.getBodyFrame();
+
+                        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+
+                        bodyFrame.handle(frame.getChannel(), _protocolSession);
+
+                        _connection.bytesReceived(_readBytes);
+                    }
+                    else if (message instanceof ProtocolInitiation)
+                    {
+                        // We get here if the server sends a response to our initial protocol header
+                        // suggesting an alternate ProtocolVersion; the server will then close the
+                        // connection.
+                        ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+                        _suggestedProtocolVersion = protocolInit.checkVersion();
+                        _logger.info("Broker suggested using protocol version:" + _suggestedProtocolVersion);
+
+                        // get round a bug in old versions of qpid whereby the connection is not closed
+                        _stateManager.changeState(AMQState.CONNECTION_CLOSED);
                     }
                 }
-            });
         }
         catch (Exception e)
         {
+            _logger.error("Exception processing frame", e);
             propagateExceptionToFrameListeners(e);
             exception(e);
         }
+
+
     }
 
     public void methodBodyReceived(final int channelId, final AMQBody bodyFrame)
@@ -568,17 +527,13 @@ public class AMQProtocolHandler implemen
         writeFrame(frame, false);
     }
 
-    public void writeFrame(AMQDataBlock frame, boolean wait)
+    public  synchronized void writeFrame(AMQDataBlock frame, boolean wait)
     {
-        final ByteBuffer buf = frame.toNioByteBuffer();
+        final ByteBuffer buf = asByteBuffer(frame);
         _writtenBytes += buf.remaining();
-        Job.fireAsynchEvent(_poolReference.getPool(), _writeJob, new Runnable()
-        {
-            public void run()
-            {
-                _sender.send(buf);
-            }
-        });
+        _sender.send(buf);
+        _sender.flush();
+
         if (PROTOCOL_DEBUG)
         {
             _protocolLogger.debug(String.format("SEND: [%s] %s", this, frame));
@@ -595,12 +550,41 @@ public class AMQProtocolHandler implemen
 
         _connection.bytesSent(_writtenBytes);
 
-        if (wait)
+    }
+
+    private ByteBuffer asByteBuffer(AMQDataBlock block)
+    {
+        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+        try
+        {
+            block.writePayload(new DataOutputStream(new OutputStream()
+            {
+
+
+                @Override
+                public void write(int b) throws IOException
+                {
+                    buf.put((byte) b);
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException
+                {
+                    buf.put(b, off, len);
+                }
+            }));
+        }
+        catch (IOException e)
         {
-            _sender.flush();
+            throw new RuntimeException(e);
         }
+
+        buf.flip();
+        return buf;
     }
 
+
     /**
      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
      * calling getProtocolSession().write() then waiting for the response.
@@ -723,7 +707,7 @@ public class AMQProtocolHandler implemen
                 _logger.debug("FailoverException interrupted connection close, ignoring as connection   close anyway.");
             }
         }
-        _poolReference.releaseExecutorService();
+
     }
 
     /** @return the number of bytes read from this protocol session */
@@ -841,8 +825,13 @@ public class AMQProtocolHandler implemen
 
     public void setNetworkConnection(NetworkConnection network)
     {
+        setNetworkConnection(network, network.getSender());
+    }
+
+    public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+    {
         _network = network;
-        _sender = network.getSender();
+        _sender = sender;
     }
 
     /** @param delay delay in seconds (not ms) */

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.java Mon Sep 19 15:13:18 2011
@@ -20,17 +20,22 @@
  */
 package org.apache.qpid.client.security;
 
-import org.apache.qpid.util.FileUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.TreeMap;
+
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * CallbackHandlerRegistry is a registry for call back handlers for user authentication and interaction during user
@@ -42,7 +47,7 @@ import java.util.Properties;
  * "amp.callbackhandler.properties". The format of the properties file is:
  *
  * <p/><pre>
- * CallbackHanlder.mechanism=fully.qualified.class.name
+ * CallbackHanlder.n.mechanism=fully.qualified.class.name where n is an ordinal
  * </pre>
  *
  * <p/>Where mechanism is an IANA-registered mechanism name and the fully qualified class name refers to a
@@ -66,51 +71,15 @@ public class CallbackHandlerRegistry
     public static final String DEFAULT_RESOURCE_NAME = "org/apache/qpid/client/security/CallbackHandlerRegistry.properties";
 
     /** A static reference to the singleton instance of this registry. */
-    private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+    private static final CallbackHandlerRegistry _instance;
 
     /** Holds a map from SASL mechanism names to call back handlers. */
-    private Map<String, Class> _mechanismToHandlerClassMap = new HashMap<String, Class>();
-
-    /** Holds a space delimited list of mechanisms that callback handlers exist for. */
-    private String _mechanisms;
-
-    /**
-     * Gets the singleton instance of this registry.
-     *
-     * @return The singleton instance of this registry.
-     */
-    public static CallbackHandlerRegistry getInstance()
-    {
-        return _instance;
-    }
+    private Map<String, Class<AMQCallbackHandler>> _mechanismToHandlerClassMap = new HashMap<String, Class<AMQCallbackHandler>>();
 
-    /**
-     * Gets the callback handler class for a given SASL mechanism name.
-     *
-     * @param mechanism The SASL mechanism name.
-     *
-     * @return The callback handler class for the mechanism, or null if none is configured for that mechanism.
-     */
-    public Class getCallbackHandlerClass(String mechanism)
-    {
-        return (Class) _mechanismToHandlerClassMap.get(mechanism);
-    }
+    /** Ordered collection of mechanisms for which callback handlers exist. */
+    private Collection<String> _mechanisms;
 
-    /**
-     * Gets a space delimited list of supported SASL mechanisms.
-     *
-     * @return A space delimited list of supported SASL mechanisms.
-     */
-    public String getMechanisms()
-    {
-        return _mechanisms;
-    }
-
-    /**
-     * Creates the call back handler registry from its configuration resource or file. This also has the side effect
-     * of configuring and registering the SASL client factory implementations using {@link DynamicSaslRegistrar}.
-     */
-    private CallbackHandlerRegistry()
+    static
     {
         // Register any configured SASL client factories.
         DynamicSaslRegistrar.registerSaslProviders();
@@ -120,12 +89,12 @@ public class CallbackHandlerRegistry
             FileUtils.openFileOrDefaultResource(filename, DEFAULT_RESOURCE_NAME,
                 CallbackHandlerRegistry.class.getClassLoader());
 
+        final Properties props = new Properties();
+
         try
         {
-            Properties props = new Properties();
+
             props.load(is);
-            parseProperties(props);
-            _logger.info("Callback handlers available for SASL mechanisms: " + _mechanisms);
         }
         catch (IOException e)
         {
@@ -146,32 +115,68 @@ public class CallbackHandlerRegistry
                 }
             }
         }
+
+        _instance = new CallbackHandlerRegistry(props);
+        _logger.info("Callback handlers available for SASL mechanisms: " + _instance._mechanisms);
+
     }
 
-    /*private InputStream openPropertiesInputStream(String filename)
+    /**
+     * Gets the singleton instance of this registry.
+     *
+     * @return The singleton instance of this registry.
+     */
+    public static CallbackHandlerRegistry getInstance()
+    {
+        return _instance;
+    }
+
+    public AMQCallbackHandler createCallbackHandler(final String mechanism)
     {
-        boolean useDefault = true;
-        InputStream is = null;
-        if (filename != null)
+        final Class<AMQCallbackHandler> mechanismClass = _mechanismToHandlerClassMap.get(mechanism);
+
+        if (mechanismClass == null)
         {
-            try
-            {
-                is = new BufferedInputStream(new FileInputStream(new File(filename)));
-                useDefault = false;
-            }
-            catch (FileNotFoundException e)
-            {
-                _logger.error("Unable to read from file " + filename + ": " + e, e);
-            }
+            throw new IllegalArgumentException("Mechanism " + mechanism + " not known");
         }
 
-        if (useDefault)
+        try
+        {
+            return mechanismClass.newInstance();
+        }
+        catch (InstantiationException e)
+        {
+            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
+        }
+        catch (IllegalAccessException e)
         {
-            is = CallbackHandlerRegistry.class.getResourceAsStream(DEFAULT_RESOURCE_NAME);
+            throw new IllegalArgumentException("Unable to create an instance of mechanism " + mechanism, e);
         }
+    }
 
-        return is;
-    }*/
+    /**
+     * Gets collections of supported SASL mechanism names, ordered by preference
+     *
+     * @return collection of SASL mechanism names.
+     */
+    public Collection<String> getMechanisms()
+    {
+        return Collections.unmodifiableCollection(_mechanisms);
+    }
+
+    /**
+     * Creates the call back handler registry from its configuration resource or file.
+     *
+     * This also has the side effect of configuring and registering the SASL client factory
+     * implementations using {@link DynamicSaslRegistrar}.
+     *
+     * This constructor is default protection to allow for effective unit testing.  Clients must use
+     * {@link #getInstance()} to obtain the singleton instance.
+     */
+    CallbackHandlerRegistry(final Properties props)
+    {
+        parseProperties(props);
+    }
 
     /**
      * Scans the specified properties as a mapping from IANA registered SASL mechanism to call back handler
@@ -183,20 +188,20 @@ public class CallbackHandlerRegistry
      */
     private void parseProperties(Properties props)
     {
+
+        final Map<Integer, String> mechanisms = new TreeMap<Integer, String>();
+
         Enumeration e = props.propertyNames();
         while (e.hasMoreElements())
         {
-            String propertyName = (String) e.nextElement();
-            int period = propertyName.indexOf(".");
-            if (period < 0)
-            {
-                _logger.warn("Unable to parse property " + propertyName + " when configuring SASL providers");
+            final String propertyName = (String) e.nextElement();
+            final String[] parts = propertyName.split("\\.", 2);
 
-                continue;
-            }
+            checkPropertyNameFormat(propertyName, parts);
 
-            String mechanism = propertyName.substring(period + 1);
-            String className = props.getProperty(propertyName);
+            final String mechanism = parts[0];
+            final int ordinal = getPropertyOrdinal(propertyName, parts);
+            final String className = props.getProperty(propertyName);
             Class clazz = null;
             try
             {
@@ -205,20 +210,11 @@ public class CallbackHandlerRegistry
                 {
                     _logger.warn("SASL provider " + clazz + " does not implement " + AMQCallbackHandler.class
                         + ". Skipping");
-
                     continue;
                 }
-
                 _mechanismToHandlerClassMap.put(mechanism, clazz);
-                if (_mechanisms == null)
-                {
-                    _mechanisms = mechanism;
-                }
-                else
-                {
-                    // one time cost
-                    _mechanisms = _mechanisms + " " + mechanism;
-                }
+
+                mechanisms.put(ordinal, mechanism);
             }
             catch (ClassNotFoundException ex)
             {
@@ -227,5 +223,91 @@ public class CallbackHandlerRegistry
                 continue;
             }
         }
+
+        _mechanisms = mechanisms.values();  // order guaranteed by keys of treemap (i.e. our ordinals)
+
+
+    }
+
+    private void checkPropertyNameFormat(final String propertyName, final String[] parts)
+    {
+        if (parts.length != 2)
+        {
+            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers");
+        }
+    }
+
+    private int getPropertyOrdinal(final String propertyName, final String[] parts)
+    {
+        try
+        {
+            return Integer.parseInt(parts[1]);
+        }
+        catch(NumberFormatException nfe)
+        {
+            throw new IllegalArgumentException("Unable to parse property " + propertyName + " when configuring SASL providers", nfe);
+        }
+    }
+
+    /**
+     * Selects a SASL mechanism that is mutually available to both parties.  If more than one
+     * mechanism is mutually available the one appearing first (by ordinal) will be returned.
+     *
+     * @param peerMechanismList space separated list of mechanisms
+     * @return selected mechanism, or null if none available
+     */
+    public String selectMechanism(final String peerMechanismList)
+    {
+        final Set<String> peerList = mechListToSet(peerMechanismList);
+
+        return selectMechInternal(peerList, Collections.<String>emptySet());
+    }
+
+    /**
+     * Selects a SASL mechanism that is mutually available to both parties.
+     *
+     * @param peerMechanismList space separated list of mechanisms
+     * @param restrictionList space separated list of mechanisms
+     * @return selected mechanism, or null if none available
+     */
+    public String selectMechanism(final String peerMechanismList, final String restrictionList)
+    {
+        final Set<String> peerList = mechListToSet(peerMechanismList);
+        final Set<String> restrictionSet = mechListToSet(restrictionList);
+
+        return selectMechInternal(peerList, restrictionSet);
+    }
+
+    private String selectMechInternal(final Set<String> peerSet, final Set<String> restrictionSet)
+    {
+        for (final String mech : _mechanisms)
+        {
+            if (peerSet.contains(mech))
+            {
+                if (restrictionSet.isEmpty() || restrictionSet.contains(mech))
+                {
+                    return mech;
+                }
+            }
+        }
+
+        return null;
+    }
+
+    private Set<String> mechListToSet(final String mechanismList)
+    {
+        if (mechanismList == null)
+        {
+            return Collections.emptySet();
+        }
+
+        final StringTokenizer tokenizer = new StringTokenizer(mechanismList, " ");
+        final Set<String> mechanismSet = new HashSet<String>(tokenizer.countTokens());
+        while (tokenizer.hasMoreTokens())
+        {
+            mechanismSet.add(tokenizer.nextToken());
+        }
+        return Collections.unmodifiableSet(mechanismSet);
     }
+
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/security/CallbackHandlerRegistry.properties Mon Sep 19 15:13:18 2011
@@ -16,7 +16,17 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-CallbackHandler.CRAM-MD5-HASHED=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
-CallbackHandler.CRAM-MD5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.AMQPLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
-CallbackHandler.PLAIN=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+
+#
+# Format:
+# <mechanism name>.ordinal=<implementation>
+#
+# @see CallbackHandlerRegistry
+#
+
+EXTERNAL.1=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+GSSAPI.2=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+CRAM-MD5-HASHED.3=org.apache.qpid.client.security.UsernameHashedPasswordCallbackHandler
+CRAM-MD5.4=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+AMQPLAIN.5=org.apache.qpid.client.security.UsernamePasswordCallbackHandler
+PLAIN.6=org.apache.qpid.client.security.UsernamePasswordCallbackHandler

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Mon Sep 19 15:13:18 2011
@@ -22,7 +22,7 @@ package org.apache.qpid.jms;
 
 import java.util.Map;
 
-import org.apache.qpid.client.SSLConfiguration;
+import org.apache.qpid.transport.ConnectionSettings;
 
 public interface BrokerDetails
 {
@@ -104,14 +104,12 @@ public interface BrokerDetails
     long getTimeout();
 
     void setTimeout(long timeout);
-
-    SSLConfiguration getSSLConfiguration();
-
-    void setSSLConfiguration(SSLConfiguration sslConfiguration);
     
     boolean getBooleanProperty(String propName);
 
     String toString();
 
     boolean equals(Object o);
+
+    ConnectionSettings buildConnectionSettings();
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverExchangeMethod.java Mon Sep 19 15:13:18 2011
@@ -140,7 +140,6 @@ public class FailoverExchangeMethod impl
                         broker.setHost(tokens[1]);
                         broker.setPort(Integer.parseInt(tokens[2]));
                         broker.setProperties(_originalBrokerDetail.getProperties());
-                        broker.setSSLConfiguration(_originalBrokerDetail.getSSLConfiguration());
                         brokerList.add(broker);
                         
                         if (currentBrokerIP.equals(broker.getHost()) && 

Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Mon Sep 19 15:13:18 2011
@@ -36,6 +36,7 @@ import javax.jms.Queue;
 import javax.jms.Topic;
 import javax.naming.Context;
 import javax.naming.NamingException;
+import javax.naming.ConfigurationException;
 import javax.naming.spi.InitialContextFactory;
 
 import org.apache.qpid.client.AMQConnectionFactory;
@@ -139,7 +140,7 @@ public class PropertiesFileInitialContex
         return new ReadOnlyContext(environment, data);
     }
 
-    protected void createConnectionFactories(Map data, Hashtable environment)
+    protected void createConnectionFactories(Map data, Hashtable environment) throws ConfigurationException
     {
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
         {
@@ -157,7 +158,7 @@ public class PropertiesFileInitialContex
         }
     }
 
-    protected void createDestinations(Map data, Hashtable environment)
+    protected void createDestinations(Map data, Hashtable environment) throws ConfigurationException
     {
         for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
         {
@@ -225,7 +226,7 @@ public class PropertiesFileInitialContex
     /**
      * Factory method to create new Connection Factory instances
      */
-    protected ConnectionFactory createFactory(String url)
+    protected ConnectionFactory createFactory(String url) throws ConfigurationException
     {
         try
         {
@@ -233,16 +234,18 @@ public class PropertiesFileInitialContex
         }
         catch (URLSyntaxException urlse)
         {
-            _logger.warn("Unable to createFactories:" + urlse);
-        }
+            _logger.warn("Unable to create factory:" + urlse);
 
-        return null;
+            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + urlse + " due to : " +  urlse.getMessage());
+            ex.initCause(urlse);
+            throw ex;
+        }
     }
 
     /**
      * Factory method to create new Destination instances from an AMQP BindingURL
      */
-    protected Destination createDestination(String str)
+    protected Destination createDestination(String str) throws ConfigurationException
     {
         try
         {
@@ -252,7 +255,9 @@ public class PropertiesFileInitialContex
         {
             _logger.warn("Unable to create destination:" + e, e);
 
-            return null;
+            ConfigurationException ex = new ConfigurationException("Failed to parse entry: " + str + " due to : " +  e.getMessage());
+            ex.initCause(e);
+            throw ex;
         }
     }
 

Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/MockAMQConnection.java Mon Sep 19 15:13:18 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.url.URLSyntaxException;
 
@@ -37,48 +36,18 @@ public class MockAMQConnection extends A
         super(broker, username, password, clientName, virtualHost);
     }
 
-    public MockAMQConnection(String broker, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(broker, username, password, clientName, virtualHost, sslConfig);
-    }
-
     public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost)
             throws AMQException, URLSyntaxException
     {
         super(host, port, username, password, clientName, virtualHost);
     }
 
-    public MockAMQConnection(String host, int port, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(host, port, username, password, clientName, virtualHost, sslConfig);
-    }
-
-    public MockAMQConnection(String host, int port, boolean useSSL, String username, String password, String clientName, String virtualHost, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(host, port, useSSL, username, password, clientName, virtualHost, sslConfig);
-    }
-
     public MockAMQConnection(String connection)
             throws AMQException, URLSyntaxException
     {
         super(connection);
     }
 
-    public MockAMQConnection(String connection, SSLConfiguration sslConfig)
-            throws AMQException, URLSyntaxException
-    {
-        super(connection, sslConfig);
-    }
-
-    public MockAMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig)
-            throws AMQException
-    {
-        super(connectionURL, sslConfig);
-    }
-
     @Override
     public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws IOException
     {

Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Mon Sep 19 15:13:18 2011
@@ -43,4 +43,9 @@ public class TestMessageHelper
     {
         return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
     }
+
+    public static JMSObjectMessage newJMSObjectMessage()
+    {
+        return new JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Mon Sep 19 15:13:18 2011
@@ -21,10 +21,10 @@
 package org.apache.qpid.test.unit.jndi;
 
 import junit.framework.TestCase;
+
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLSyntaxException;
 
 public class ConnectionFactoryTest extends TestCase
 {
@@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
     public static final String URL = "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
     public static final String URL_STAR_PWD = "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
 
-    public void testConnectionURLString()
+    public void testConnectionURLStringMasksPassword() throws Exception
     {
-        AMQConnectionFactory factory = new AMQConnectionFactory();
-
-        assertNull("ConnectionURL should have no value at start",
-                   factory.getConnectionURL());
-
-        try
-        {
-            factory.setConnectionURLString(URL);
-        }
-        catch (URLSyntaxException e)
-        {
-            fail(e.getMessage());
-        }
+        AMQConnectionFactory factory = new AMQConnectionFactory(URL);
 
         //URL will be returned with the password field swapped for '********'
         assertEquals("Connection URL not correctly set", URL_STAR_PWD, factory.getConnectionURLString());

Modified: qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Mon Sep 19 15:13:18 2011
@@ -24,6 +24,7 @@ import java.util.Properties;
 
 import javax.jms.Queue;
 import javax.jms.Topic;
+import javax.naming.ConfigurationException;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
@@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
             assertEquals("Topic" + i + "WithSpace",bindingKey.asString());            
         }
     }
+    
+    public void testConfigurationErrors() throws Exception
+    {
+        Properties properties = new Properties();
+        properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+        properties.put("destination.my-queue","amq.topic/test;create:always}");
+        
+        try
+        {
+            ctx = new InitialContext(properties);
+            fail("A configuration exception should be thrown with details about the address syntax error");
+        }
+        catch(ConfigurationException e)
+        {
+            assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
+        }
+        
+    }
 }

Modified: qpid/branches/qpid-3346/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common.xml?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common.xml (original)
+++ qpid/branches/qpid-3346/qpid/java/common.xml Mon Sep 19 15:13:18 2011
@@ -132,8 +132,6 @@
  	</sequential>
   </macrodef>
 
-
-
   <macrodef name="jython">
     <attribute name="path"/>
     <element name="args"/>

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Mon Sep 19 15:13:18 2011
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.codec;
 
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolEncoder;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
@@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations.
- * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
  * </table>
  */
-public class AMQCodecFactory implements ProtocolCodecFactory
+public class AMQCodecFactory
 {
-    /** Holds the protocol encoder. */
-    private final AMQEncoder _encoder = new AMQEncoder();
 
     /** Holds the protocol decoder. */
     private final AMQDecoder _frameDecoder;
@@ -56,15 +50,6 @@ public class AMQCodecFactory implements 
         _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
     }
 
-    /**
-     * Gets the AMQP encoder.
-     *
-     * @return The AMQP encoder.
-     */
-    public ProtocolEncoder getEncoder()
-    {
-        return _encoder;
-    }
 
     /**
      * Gets the AMQP decoder.

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Mon Sep 19 15:13:18 2011
@@ -20,13 +20,9 @@
  */
 package org.apache.qpid.codec;
 
-import java.util.ArrayList;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
 
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
  * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
  *       per-session overhead.
  */
-public class AMQDecoder extends CumulativeProtocolDecoder
+public class AMQDecoder
 {
-
-    private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
-
     /** Holds the 'normal' AMQP data decoder. */
     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
 
@@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
 
     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
     private boolean _expectProtocolInitiation;
-    private boolean firstDecode = true;
 
     private AMQMethodBodyFactory _bodyFactory;
 
-    private ByteBuffer _remainingBuf;
-    
+    private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
+
     /**
      * Creates a new AMQP decoder.
      *
@@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
         _bodyFactory = new AMQMethodBodyFactory(session);
     }
 
-    /**
-     * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
-     * intiation decoders.
-     *
-     * @param session The Mina session.
-     * @param in      The raw byte buffer.
-     * @param out     The Mina object output gatherer to write decoded objects to.
-     *
-     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
-     *
-     * @throws Exception If the data cannot be decoded for any reason.
-     */
-    protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
-    {
-
-        boolean decoded;
-        if (_expectProtocolInitiation  
-            || (firstDecode
-                && (in.remaining() > 0)
-                && (in.get(in.position()) == (byte)'A')))
-        {
-            decoded = doDecodePI(session, in, out);
-        }
-        else
-        {
-            decoded = doDecodeDataBlock(session, in, out);
-        }
-        if(firstDecode && decoded)
-        {
-            firstDecode = false;
-        }
-        return decoded;
-    }
-
-    /**
-     * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
-     *
-     * @param session The Mina session.
-     * @param in      The raw byte buffer.
-     * @param out     The Mina object output gatherer to write decoded objects to.
-     *
-     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
-     *
-     * @throws Exception If the data cannot be decoded for any reason.
-     */
-    protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
-    {
-        int pos = in.position();
-        boolean enoughData = _dataBlockDecoder.decodable(in.buf());
-        in.position(pos);
-        if (!enoughData)
-        {
-            // returning false means it will leave the contents in the buffer and
-            // call us again when more data has been read
-            return false;
-        }
-        else
-        {
-            _dataBlockDecoder.decode(session, in, out);
-
-            return true;
-        }
-    }
-
-    /**
-     * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
-     *
-     * @param session The Mina session.
-     * @param in      The raw byte buffer.
-     * @param out     The Mina object output gatherer to write decoded objects to.
-     *
-     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
-     *
-     * @throws Exception If the data cannot be decoded for any reason.
-     */
-    private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
-    {
-        boolean enoughData = _piDecoder.decodable(in.buf());
-        if (!enoughData)
-        {
-            // returning false means it will leave the contents in the buffer and
-            // call us again when more data has been read
-            return false;
-        }
-        else
-        {
-            ProtocolInitiation pi = new ProtocolInitiation(in.buf());
-            out.write(pi);
 
-            return true;
-        }
-    }
 
     /**
      * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
@@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
         _expectProtocolInitiation = expectProtocolInitiation;
     }
 
-
-    /**
-     * Cumulates content of <tt>in</tt> into internal buffer and forwards
-     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
-     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
-     * and the cumulative buffer is compacted after decoding ends.
-     *
-     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
-     *                               <tt>true</tt> not consuming the cumulative buffer.
-     */
-    public void decode( IoSession session, ByteBuffer in,
-                        ProtocolDecoderOutput out ) throws Exception
+    private class RemainingByteArrayInputStream extends InputStream
     {
-        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
-        // if we have a session buffer, append data to that otherwise
-        // use the buffer read from the network directly
-        if( buf != null )
-        {
-            buf.put( in );
-            buf.flip();
-        }
-        else
+        private int _currentListPos;
+        private int _markPos;
+
+
+        @Override
+        public int read() throws IOException
         {
-            buf = in;
+            ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+            if(currentStream.available() > 0)
+            {
+                return currentStream.read();
+            }
+            else if((_currentListPos == _remainingBufs.size())
+                    || (++_currentListPos == _remainingBufs.size()))
+            {
+                return -1;
+            }
+            else
+            {
+
+                ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+                stream.mark(0);
+                return stream.read();
+            }
         }
 
-        for( ;; )
+        @Override
+        public int read(final byte[] b, final int off, final int len) throws IOException
         {
-            int oldPos = buf.position();
-            boolean decoded = doDecode( session, buf, out );
-            if( decoded )
+
+            if(_currentListPos == _remainingBufs.size())
             {
-                if( buf.position() == oldPos )
+                return -1;
+            }
+            else
+            {
+                ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+                final int available = currentStream.available();
+                int read = currentStream.read(b, off, len > available ? available : len);
+                if(read < len)
                 {
-                    throw new IllegalStateException(
-                            "doDecode() can't return true when buffer is not consumed." );
+                    if(_currentListPos++ != _remainingBufs.size())
+                    {
+                        _remainingBufs.get(_currentListPos).mark(0);
+                    }
+                    int correctRead = read == -1 ? 0 : read;
+                    int subRead = read(b, off+correctRead, len-correctRead);
+                    if(subRead == -1)
+                    {
+                        return read;
+                    }
+                    else
+                    {
+                        return correctRead+subRead;
+                    }
                 }
-
-                if( !buf.hasRemaining() )
+                else
                 {
-                    break;
+                    return len;
                 }
             }
-            else
-            {
-                break;
-            }
         }
 
-        // if there is any data left that cannot be decoded, we store
-        // it in a buffer in the session and next time this decoder is
-        // invoked the session buffer gets appended to
-        if ( buf.hasRemaining() )
+        @Override
+        public int available() throws IOException
         {
-            storeRemainingInSession( buf, session );
+            int total = 0;
+            for(int i = _currentListPos; i < _remainingBufs.size(); i++)
+            {
+                total += _remainingBufs.get(i).available();
+            }
+            return total;
         }
-        else
+
+        @Override
+        public void mark(final int readlimit)
         {
-            removeSessionBuffer( session );
+            _markPos = _currentListPos;
+            final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+            if(stream != null)
+            {
+                stream.mark(readlimit);
+            }
         }
-    }
-
-    /**
-     * Releases the cumulative buffer used by the specified <tt>session</tt>.
-     * Please don't forget to call <tt>super.dispose( session )</tt> when
-     * you override this method.
-     */
-    public void dispose( IoSession session ) throws Exception
-    {
-        removeSessionBuffer( session );
-    }
 
-    private void removeSessionBuffer(IoSession session)
-    {
-        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
-        if( buf != null )
+        @Override
+        public void reset() throws IOException
         {
-            buf.release();
-            session.removeAttribute( BUFFER );
+            _currentListPos = _markPos;
+            final int size = _remainingBufs.size();
+            if(_currentListPos < size)
+            {
+                _remainingBufs.get(_currentListPos).reset();
+            }
+            for(int i = _currentListPos+1; i<size; i++)
+            {
+                _remainingBufs.get(i).reset();
+            }
         }
     }
 
-    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
-
-    private void storeRemainingInSession(ByteBuffer buf, IoSession session)
-    {
-        ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
-        remainingBuf.setAutoExpand( true );
-        remainingBuf.put( buf );
-        session.setAttribute( BUFFER, remainingBuf );
-    }
 
-    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
 
         // get prior remaining data from accumulator
         ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
-        ByteBuffer msg;
-        // if we have a session buffer, append data to that otherwise
-        // use the buffer read from the network directly
-        if( _remainingBuf != null )
+        DataInputStream msg;
+
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+        if(!_remainingBufs.isEmpty())
         {
-            _remainingBuf.put(buf);
-            _remainingBuf.flip();
-            msg = _remainingBuf;
+            _remainingBufs.add(bais);
+            msg = new DataInputStream(new RemainingByteArrayInputStream());
         }
         else
         {
-            msg = ByteBuffer.wrap(buf);
+            msg = new DataInputStream(bais);
         }
-        
-        if (_expectProtocolInitiation  
-            || (firstDecode
-                && (msg.remaining() > 0)
-                && (msg.get(msg.position()) == (byte)'A')))
-        {
-            if (_piDecoder.decodable(msg.buf()))
-            {
-                dataBlocks.add(new ProtocolInitiation(msg.buf()));
-            }
-        }
-        else
+
+        boolean enoughData = true;
+        while (enoughData)
         {
-            boolean enoughData = true;
-            while (enoughData)
+            if(!_expectProtocolInitiation)
             {
-                int pos = msg.position();
-
                 enoughData = _dataBlockDecoder.decodable(msg);
-                msg.position(pos);
                 if (enoughData)
                 {
                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
                 }
-                else
+            }
+            else
+            {
+                enoughData = _piDecoder.decodable(msg);
+                if (enoughData)
                 {
-                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
-                    _remainingBuf.setAutoExpand(true);
-                    _remainingBuf.put(msg);
+                    dataBlocks.add(new ProtocolInitiation(msg));
+                }
+
+            }
+
+            if(!enoughData)
+            {
+                if(!_remainingBufs.isEmpty())
+                {
+                    _remainingBufs.remove(_remainingBufs.size()-1);
+                    ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
+                    while(iterator.hasNext() && iterator.next().available() == 0)
+                    {
+                        iterator.remove();
+                    }
+                }
+                if(bais.available()!=0)
+                {
+                    byte[] remaining = new byte[bais.available()];
+                    bais.read(remaining);
+                    _remainingBufs.add(new ByteArrayInputStream(remaining));
                 }
             }
-        }
-        if(firstDecode && dataBlocks.size() > 0)
-        {
-            firstDecode = false;
         }
         return dataBlocks;
     }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Mon Sep 19 15:13:18 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
  */
 public class ClientProperties
 {
-  
+
     /**
      * Currently with Qpid it is not possible to change the client ID.
      * If one is not specified upon connection construction, an id is generated automatically.
@@ -68,38 +68,50 @@ public class ClientProperties
      * by the broker in TuneOK it will be used as the heartbeat interval.
      * If not a warning will be printed and the max value specified for
      * heartbeat in TuneOK will be used
-     * 
+     *
      * The default idle timeout is set to 120 secs
      */
     public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-    
+
     public static final String HEARTBEAT = "qpid.heartbeat";
     public static final int HEARTBEAT_DEFAULT = 120;
-    
+
     /**
      * This value will be used to determine the default destination syntax type.
      * Currently the two types are Binding URL (java only) and the Addressing format (used by
-     * all clients). 
+     * all clients).
      */
     public static final String DEST_SYNTAX = "qpid.dest_syntax";
-    
+
     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
 
     public static final String AMQP_VERSION = "qpid.amqp.version";
-    
-    private static ClientProperties _instance = new ClientProperties();
-    
+
+    public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
+
+    /**
+     * System properties to change the default timeout used during
+     * synchronous operations.
+     */
+    public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout";
+    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
+
+    /**
+     * A default timeout value for synchronous operations
+     */
+    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
+
     /*
-    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME = 
+    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-    
+
     public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-    
-    
+
+
     public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-    
-    
+
+
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.AMQException;
 
@@ -34,7 +36,7 @@ public interface AMQBody
      */
     public abstract int getSize();
     
-    public void writePayload(ByteBuffer buffer);
+    public void writePayload(DataOutputStream buffer) throws IOException;
     
-    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
+    void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,10 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 
 /**
  * A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
      * Writes the datablock to the specified buffer.
      * @param buffer
      */
-    public abstract void writePayload(ByteBuffer buffer);
-
-    public ByteBuffer toByteBuffer()
-    {
-        final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
-
-        writePayload(buffer);    
-        buffer.flip();
-        return buffer;
-    }
-
-    public java.nio.ByteBuffer toNioByteBuffer()
-    {
-        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
-
-        ByteBuffer buf = ByteBuffer.wrap(buffer);
-        writePayload(buf);    
-        buffer.flip();
-        return buffer;
-    }
+    public abstract void writePayload(DataOutputStream buffer) throws IOException;
 
 }

Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Mon Sep 19 15:13:18 2011
@@ -20,18 +20,14 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+
 public class AMQDataBlockDecoder
 {
-    private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
 
     private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
 
@@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
     public AMQDataBlockDecoder()
     { }
 
-    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
+    public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
     {
-        final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+        final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
         // type, channel, body length and end byte
         if (remainingAfterAttributes < 0)
         {
             return false;
         }
 
-        in.position(in.position() + 1 + 2);
+        in.mark(8);
+        in.skip(1 + 2);
+
+
         // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() 
-        final long bodySize = in.getInt() & 0xffffffffL; 
+        final long bodySize = in.readInt() & 0xffffffffL;
+
+        in.reset();
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
-        throws AMQFrameDecodingException, AMQProtocolVersionException
+    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+            throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
-        final byte type = in.get();
+        final byte type = in.readByte();
 
         BodyFactory bodyFactory;
         if (type == AMQMethodBody.TYPE)
@@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
             throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
         }
 
-        final int channel = in.getUnsignedShort();
-        final long bodySize = in.getUnsignedInt();
+        final int channel = in.readUnsignedShort();
+        final long bodySize = EncodingUtils.readUnsignedInteger(in);
 
         // bodySize can be zero
         if ((channel < 0) || (bodySize < 0))
@@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
 
         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
 
-        byte marker = in.get();
+        byte marker = in.readByte();
         if ((marker & 0xFF) != 0xCE)
         {
             throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
@@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
         return frame;
     }
 
-    public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
-    {
-        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
-        if (bodyFactory == null)
-        {
-            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
-            bodyFactory = new AMQMethodBodyFactory(protocolSession);
-            session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
-        }
-        
-        out.write(createAndPopulateFrame(bodyFactory, in));
-    }
-
-    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
-    {
-        return decodable(msg.buf());
-    }
-
-    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
-    {
-        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
-    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org