You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2006/09/20 00:07:25 UTC

svn commit: r447994 [21/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,351 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.JMSException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+public class JMSBytesMessage extends AbstractJMSMessage implements javax.jms.BytesMessage
+{
+    private static final String MIME_TYPE = "application/octet-stream";
+
+    private boolean _readable = false;
+
+    /**
+     * The default initial size of the buffer. The buffer expands automatically.
+     */
+    private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
+
+    JMSBytesMessage()
+    {
+        this(null);
+    }
+
+    /**
+     * Construct a bytes message with existing data.
+     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+     * set to auto expand
+     */
+    JMSBytesMessage(ByteBuffer data)
+    {
+        super(data); // this instanties a content header
+        getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+
+        if (_data == null)
+        {
+            _data = ByteBuffer.allocate(DEFAULT_BUFFER_INITIAL_SIZE);
+            _data.setAutoExpand(true);
+        }
+        _readable = (data != null);
+    }
+
+    JMSBytesMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader)
+            throws AMQException
+    {
+        // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
+        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+        getJmsContentHeaderProperties().setContentType(MIME_TYPE);        
+        _readable = true;
+    }
+
+    public void clearBody() throws JMSException
+    {
+        _data.clear();
+        _readable = false;
+    }
+
+    public String toBodyString() throws JMSException
+    {
+        checkReadable();
+        try
+        {
+            return getText();
+        }
+        catch (IOException e)
+        {
+            throw new JMSException(e.toString());
+        }
+    }
+
+    /**
+     * We reset the stream before and after reading the data. This means that toString() will always output
+     * the entire message and also that the caller can then immediately start reading as if toString() had
+     * never been called.
+     * @return
+     * @throws IOException
+     */
+    private String getText() throws IOException
+    {
+        // this will use the default platform encoding
+        if (_data == null)
+        {
+            return null;
+        }
+        int pos = _data.position();
+        _data.rewind();
+        // one byte left is for the end of frame marker
+        if (_data.remaining() == 0)
+        {
+            // this is really redundant since pos must be zero
+            _data.position(pos);
+            return null;
+        }
+        else
+        {
+            String data = _data.getString(Charset.forName("UTF8").newDecoder());
+            _data.position(pos);
+            return data;
+        }
+    }
+
+    public String getMimeType()
+    {
+        return MIME_TYPE;
+    }
+
+    public long getBodyLength() throws JMSException
+    {
+        checkReadable();
+        return _data.limit();
+    }
+
+    private void checkReadable() throws MessageNotReadableException
+    {
+        if (!_readable)
+        {
+            throw new MessageNotReadableException("You need to call reset() to make the message readable");
+        }
+    }
+
+    private void checkWritable() throws MessageNotWriteableException
+    {
+        if (_readable)
+        {
+            throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
+        }
+    }
+
+    public boolean readBoolean() throws JMSException
+    {
+        checkReadable();
+        return _data.get() != 0;
+    }
+
+    public byte readByte() throws JMSException
+    {
+        checkReadable();
+        return _data.get();
+    }
+
+    public int readUnsignedByte() throws JMSException
+    {
+        checkReadable();
+        return _data.getUnsigned();
+    }
+
+    public short readShort() throws JMSException
+    {
+        checkReadable();
+        return _data.getShort();
+    }
+
+    public int readUnsignedShort() throws JMSException
+    {
+        checkReadable();
+        return _data.getUnsignedShort();
+    }
+
+    public char readChar() throws JMSException
+    {
+        checkReadable();
+        return _data.getChar();
+    }
+
+    public int readInt() throws JMSException
+    {
+        checkReadable();
+        return _data.getInt();
+    }
+
+    public long readLong() throws JMSException
+    {
+        checkReadable();
+        return _data.getLong();
+    }
+
+    public float readFloat() throws JMSException
+    {
+        checkReadable();
+        return _data.getFloat();
+    }
+
+    public double readDouble() throws JMSException
+    {
+        checkReadable();
+        return _data.getDouble();
+    }
+
+    public String readUTF() throws JMSException
+    {
+        checkReadable();
+        try
+        {
+            return _data.getString(Charset.forName("UTF-8").newDecoder());
+        }
+        catch (CharacterCodingException e)
+        {
+            JMSException je = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+            je.setLinkedException(e);
+            throw je;
+        }
+    }
+
+    public int readBytes(byte[] bytes) throws JMSException
+    {
+        if (bytes == null)
+        {
+            throw new IllegalArgumentException("byte array must not be null");
+        }
+        checkReadable();
+        int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining());
+        _data.get(bytes, 0, count);
+        return count;
+    }
+
+    public int readBytes(byte[] bytes, int maxLength) throws JMSException
+    {
+        if (bytes == null)
+        {
+            throw new IllegalArgumentException("byte array must not be null");
+        }
+        if (maxLength > bytes.length)
+        {
+            throw new IllegalArgumentException("maxLength must be <= bytes.length");
+        }
+        checkReadable();
+        int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+        _data.get(bytes, 0, count);
+        return count;
+    }
+
+    public void writeBoolean(boolean b) throws JMSException
+    {
+        checkWritable();
+        _data.put(b?(byte)1:(byte)0);
+    }
+
+    public void writeByte(byte b) throws JMSException
+    {
+        checkWritable();
+        _data.put(b);
+    }
+
+    public void writeShort(short i) throws JMSException
+    {
+        checkWritable();
+        _data.putShort(i);
+    }
+
+    public void writeChar(char c) throws JMSException
+    {
+        checkWritable();
+        _data.putChar(c);
+    }
+
+    public void writeInt(int i) throws JMSException
+    {
+        checkWritable();
+        _data.putInt(i);
+    }
+
+    public void writeLong(long l) throws JMSException
+    {
+        checkWritable();
+        _data.putLong(l);
+    }
+
+    public void writeFloat(float v) throws JMSException
+    {
+        checkWritable();
+        _data.putFloat(v);
+    }
+
+    public void writeDouble(double v) throws JMSException
+    {
+        checkWritable();
+        _data.putDouble(v);
+    }
+
+    public void writeUTF(String string) throws JMSException
+    {
+        checkWritable();
+        try
+        {
+            _data.putString(string, Charset.forName("UTF-8").newEncoder());
+        }
+        catch (CharacterCodingException e)
+        {
+            JMSException ex = new JMSException("Unable to encode string: " + e);
+            ex.setLinkedException(e);
+            throw ex;
+        }
+    }
+
+    public void writeBytes(byte[] bytes) throws JMSException
+    {
+        checkWritable();
+        _data.put(bytes);
+    }
+
+    public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+    {
+        checkWritable();
+        _data.put(bytes, offset, length);
+    }
+
+    public void writeObject(Object object) throws JMSException
+    {
+        checkWritable();
+        if (object == null)
+        {
+            throw new NullPointerException("Argument must not be null");
+        }
+        _data.putObject(object);
+    }
+
+    public void reset() throws JMSException
+    {
+        //checkWritable();
+        _data.flip();
+        _readable = true;
+    }
+
+    public boolean isReadable()
+    {
+        return _readable;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+
+public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
+{
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSBytesMessage(deliveryTag, data, contentHeader);
+    }
+
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSBytesMessage();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSBytesMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,175 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.ObjectMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotWriteableException;
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
+{
+    static final String MIME_TYPE = "application/java-object-stream";
+    private final boolean _readonly;
+
+    private static final int DEFAULT_BUFFER_SIZE = 1024;
+    /**
+     * Creates empty, writable message for use by producers
+     */
+    JMSObjectMessage()
+    {
+        this(null);
+    }
+
+    private JMSObjectMessage(ByteBuffer data)
+    {
+        super(data);
+        if (data == null)
+        {
+            _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+            _data.setAutoExpand(true);
+        }
+        _readonly = (data != null);
+        getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+    }
+
+    /**
+     * Creates read only message for delivery to consumers
+     */
+    JMSObjectMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    {
+        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, data);
+        _readonly = data != null;
+    }
+
+    public void clearBody() throws JMSException
+    {
+        if (_data != null)
+        {
+            _data.release();
+        }
+        _data = null;
+    }
+
+    public String toBodyString() throws JMSException
+    {
+        return toString(_data);
+    }
+
+    public String getMimeType()
+    {
+        return MIME_TYPE;
+    }
+
+    public void setObject(Serializable serializable) throws JMSException
+    {
+        if (_readonly)
+        {
+            throw new MessageNotWriteableException("Message is not writable.");
+        }
+
+        if (_data == null)
+        {
+            _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+            _data.setAutoExpand(true);
+        }
+        try
+        {
+            ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
+            out.writeObject(serializable);
+            out.flush();
+            out.close();
+            _data.rewind();
+        }
+        catch (IOException e)
+        {
+            throw new MessageFormatException("Message not serializable: " + e);
+        }
+    }
+
+    public Serializable getObject() throws JMSException
+    {
+        ObjectInputStream in = null;
+        if (_data == null)
+        {
+            return null;
+        }
+
+        try
+        {
+            in = new ObjectInputStream(_data.asInputStream());
+            return (Serializable) in.readObject();
+        }
+        catch (IOException e)
+        {
+            throw new MessageFormatException("Could not deserialize message: " + e);
+        }
+        catch (ClassNotFoundException e)
+        {
+            throw new MessageFormatException("Could not deserialize message: " + e);
+        }
+        finally
+        {
+            _data.rewind();
+            close(in);
+        }
+    }
+
+    private static void close(InputStream in)
+    {
+        try
+        {
+            if (in != null)
+            {
+                in.close();
+            }
+        }
+        catch (IOException ignore)
+        {
+        }
+    }
+
+    private static String toString(ByteBuffer data)
+    {
+        if (data == null)
+        {
+            return null;
+        }
+        int pos = data.position();
+        try
+        {
+            return data.getString(Charset.forName("UTF8").newDecoder());
+        }
+        catch (CharacterCodingException e)
+        {
+            return null;
+        }
+        finally
+        {
+            data.position(pos);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+
+public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
+{
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSObjectMessage(deliveryTag, data, contentHeader);
+    }
+
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSObjectMessage();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSObjectMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,159 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.mina.common.ByteBuffer;
+
+import javax.jms.JMSException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharacterCodingException;
+
+public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
+{
+    private static final String MIME_TYPE = "text/plain";
+
+    private String _decodedValue;
+
+    JMSTextMessage() throws JMSException
+    {
+        this(null, null);
+    }
+
+    JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+    {
+        super(data); // this instantiates a content header
+        getJmsContentHeaderProperties().setContentType(MIME_TYPE);
+        getJmsContentHeaderProperties().setEncoding(encoding);
+    }
+
+    JMSTextMessage(long deliveryTag, ByteBuffer data, BasicContentHeaderProperties contentHeader)
+            throws AMQException
+    {
+        super(deliveryTag, contentHeader, data);
+        contentHeader.setContentType(MIME_TYPE);
+        _data = data;
+    }
+
+    JMSTextMessage(ByteBuffer data) throws JMSException
+    {
+        this(data, null);
+    }
+
+    JMSTextMessage(String text) throws JMSException
+    {
+        super((ByteBuffer)null);
+        setText(text);
+    }
+
+    public void clearBody() throws JMSException
+    {
+        if (_data != null)
+        {
+            _data.release();
+        }
+        _data = null;
+        _decodedValue = null;
+    }
+
+    public String toBodyString() throws JMSException
+    {
+        return getText();
+    }
+
+    public void setData(ByteBuffer data)
+    {
+        _data = data;
+    }
+
+    public String getMimeType()
+    {
+        return MIME_TYPE;
+    }
+
+    public void setText(String string) throws JMSException
+    {
+        clearBody();
+        try
+        {
+            _data = ByteBuffer.allocate(string.length());
+            _data.limit(string.length());
+            //_data.sweep();
+            _data.setAutoExpand(true);
+            if (getJmsContentHeaderProperties().getEncoding() == null)
+            {
+                _data.put(string.getBytes());
+            }
+            else
+            {
+                _data.put(string.getBytes(getJmsContentHeaderProperties().getEncoding()));
+            }
+            _decodedValue = string;
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            // should never occur
+            throw new JMSException("Unable to decode string data");
+        }
+    }
+
+    public String getText() throws JMSException
+    {
+        if (_data == null && _decodedValue == null)
+        {
+            return null;
+        }
+        else if (_decodedValue != null)
+        {
+            return _decodedValue;
+        }
+        else
+        {
+            _data.rewind();
+            if (getJmsContentHeaderProperties().getEncoding() != null)
+            {
+                try
+                {
+                    _decodedValue = _data.getString(Charset.forName(getJmsContentHeaderProperties().getEncoding()).newDecoder());
+                }
+                catch (CharacterCodingException e)
+                {
+                    JMSException je = new JMSException("Could not decode string data: " + e);
+                    je.setLinkedException(e);
+                    throw je;
+                }
+            }
+            else
+            {
+                try
+                {
+                    _decodedValue = _data.getString(Charset.defaultCharset().newDecoder());
+                }
+                catch (CharacterCodingException e)
+                {
+                    JMSException je = new JMSException("Could not decode string data: " + e);
+                    je.setLinkedException(e);
+                    throw je;
+                }
+            }
+            return _decodedValue;
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+
+public class JMSTextMessageFactory extends AbstractJMSMessageFactory
+{
+
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSTextMessage();
+    }
+
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSTextMessage(deliveryTag, data, (BasicContentHeaderProperties)contentHeader.properties);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/JMSTextMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+import java.util.List;
+
+
+public interface MessageFactory
+{
+    AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+                                     ContentHeaderBody contentHeader,
+                                     List bodies)
+        throws JMSException, AMQException;
+
+    AbstractJMSMessage createMessage() throws JMSException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import javax.jms.JMSException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+public class MessageFactoryRegistry
+{
+    private final Map _mimeToFactoryMap = new HashMap();
+
+    public void registerFactory(String mimeType, MessageFactory mf)
+    {
+        if (mf == null)
+        {
+            throw new IllegalArgumentException("Message factory must not be null");
+        }
+        _mimeToFactoryMap.put(mimeType, mf);
+    }
+
+    public MessageFactory deregisterFactory(String mimeType)
+    {
+        return (MessageFactory) _mimeToFactoryMap.remove(mimeType);
+    }
+
+    /**
+     * Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+     * concrete message type.
+     * @param deliveryTag the AMQ message id
+     * @param redelivered true if redelivered
+     * @param contentHeader the content header that was received
+     * @param bodies a list of ContentBody instances
+     * @return the message.
+     * @throws AMQException
+     * @throws JMSException
+     */
+    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+                                            ContentHeaderBody contentHeader,
+                                            List bodies) throws AMQException, JMSException
+    {
+        BasicContentHeaderProperties properties =  (BasicContentHeaderProperties) contentHeader.properties;
+        MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(properties.getContentType());
+        if (mf == null)
+        {
+            throw new AMQException("Unsupport MIME type of " + properties.getContentType());
+        }
+        else
+        {
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+        }
+    }
+
+    public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+    {
+        if (mimeType == null)
+        {
+            throw new IllegalArgumentException("Mime type must not be null");
+        }
+        MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(mimeType);
+        if (mf == null)
+        {
+            throw new AMQException("Unsupport MIME type of " + mimeType);
+        }
+        else
+        {
+            return mf.createMessage();
+        }
+    }
+
+    /**
+     * Construct a new registry with the default message factories registered
+     * @return a message factory registry
+     */
+    public static MessageFactoryRegistry newDefaultRegistry()
+    {
+        MessageFactoryRegistry mf = new MessageFactoryRegistry();
+        mf.registerFactory("text/plain", new JMSTextMessageFactory());
+        mf.registerFactory("text/xml", new JMSTextMessageFactory());
+        mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
+        mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+        mf.registerFactory(null, new JMSBytesMessageFactory());
+        return mf;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/MessageFactoryRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+public class UnexpectedBodyReceivedException extends AMQException
+{
+
+    public UnexpectedBodyReceivedException(Logger logger, String msg, Throwable t)
+    {
+        super(logger, msg, t);
+    }
+
+    public UnexpectedBodyReceivedException(Logger logger, String msg)
+    {
+        super(logger, msg);
+    }
+
+    public UnexpectedBodyReceivedException(Logger logger, int errorCode, String msg)
+    {
+        super(logger, errorCode, msg);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnexpectedBodyReceivedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.framing.*;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the
+ * deliver body, the content header and the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done
+ * outside of the MINA dispatcher thread in order to minimise the amount of work done in
+ * the MINA dispatcher thread.
+ *
+ */
+public class UnprocessedMessage
+{
+    private long _bytesReceived = 0;
+
+    public BasicDeliverBody deliverBody;
+    public BasicReturnBody bounceBody; // TODO: check change (gustavo)
+    public int channelId;
+    public ContentHeaderBody contentHeader;
+
+    /**
+     * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
+     */
+    public List bodies = new LinkedList();
+
+    public void receiveBody(ContentBody body) throws UnexpectedBodyReceivedException
+    {
+        bodies.add(body);
+        if (body.payload != null)
+        {
+            _bytesReceived += body.payload.remaining();
+        }
+    }
+
+    public boolean isAllBodyDataReceived()
+    {
+        return _bytesReceived == contentHeader.bodySize;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/message/UnprocessedMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+public class AMQMethodEvent
+{
+    private AMQMethodBody _method;
+
+    private int _channelId;
+    
+    private AMQProtocolSession _protocolSession;
+
+    public AMQMethodEvent(int channelId, AMQMethodBody method, AMQProtocolSession protocolSession)
+    {
+        _channelId = channelId;
+        _method = method;
+        _protocolSession = protocolSession;
+    }
+
+    public AMQMethodBody getMethod()
+    {
+        return _method;
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+    
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+    
+    public String toString()
+    {
+        StringBuffer buf = new StringBuffer("Method event: ");
+        buf.append("\nChannel id: ").append(_channelId);
+        buf.append("\nMethod: ").append(_method);
+        return buf.toString();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.AMQException;
+
+public interface AMQMethodListener
+{
+    /**
+     * Invoked when a method frame has been received
+     * @param evt the event
+     * @return true if the handler has processed the method frame, false otherwise. Note
+     * that this does not prohibit the method event being delivered to subsequent listeners
+     * but can be used to determine if nobody has dealt with an incoming method frame.
+     * @throws AMQException if an error has occurred. This exception will be delivered
+     * to all registered listeners using the error() method (see below) allowing them to
+     * perform cleanup if necessary.
+     */
+    boolean methodReceived(AMQMethodEvent evt) throws AMQException;
+
+    /**
+     * Callback when an error has occurred. Allows listeners to clean up.
+     * @param e
+     */
+    void error(Exception e);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQMethodListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,530 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoHandlerAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.qpid.AMQDisconnectedException;
+import org.apache.qpid.AMQException;
+
+import org.apache.qpid.AMQConnectionClosedException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.failover.FailoverHandler;
+import org.apache.qpid.client.failover.FailoverState;
+
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.ssl.BogusSSLContextFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
+public class AMQProtocolHandler extends IoHandlerAdapter
+{
+    private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
+
+    /**
+     * The connection that this protocol handler is associated with. There is a 1-1
+     * mapping between connection instances and protocol handler instances.
+     */
+    private AMQConnection _connection;
+
+    /**
+     * Used only when determining whether to add the SSL filter or not. This should be made more
+     * generic in future since we will potentially have many transport layer options
+     */
+    private boolean _useSSL;
+
+    /**
+     * Our wrapper for a protocol session that provides access to session values
+     * in a typesafe manner.
+     */
+    private volatile AMQProtocolSession _protocolSession;
+
+    private AMQStateManager _stateManager = new AMQStateManager();
+
+    private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+
+    /**
+     * We create the failover handler when the session is created since it needs a reference to the IoSession in order
+     * to be able to send errors during failover back to the client application. The session won't be available in the
+     * case where we failing over due to a Connection.Redirect message from the broker.
+     */
+    private FailoverHandler _failoverHandler;
+
+    /**
+     * This flag is used to track whether failover is being attempted. It is used to prevent the application constantly
+     * attempting failover where it is failing.
+     */
+    private FailoverState _failoverState = FailoverState.NOT_STARTED;
+
+    private CountDownLatch _failoverLatch;
+
+    public AMQProtocolHandler(AMQConnection con)
+    {
+        _connection = con;
+
+        // We add a proxy for the state manager so that we can substitute the state manager easily in this class.
+        // We substitute the state manager when performing failover
+        _frameListeners.add(new AMQMethodListener()
+        {
+            public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+            {
+                return _stateManager.methodReceived(evt);
+            }
+
+            public void error(Exception e)
+            {
+                _stateManager.error(e);
+            }
+        });
+    }
+
+    public boolean isUseSSL()
+    {
+        return _useSSL;
+    }
+
+    public void setUseSSL(boolean useSSL)
+    {
+        _useSSL = useSSL;
+    }
+
+    public void sessionCreated(IoSession session) throws Exception
+    {
+        _logger.debug("Protocol session created for session " + System.identityHashCode(session));
+        _failoverHandler = new FailoverHandler(this, session);
+
+        final ProtocolCodecFilter pcf = new ProtocolCodecFilter(new AMQCodecFactory(false));
+
+        if (Boolean.getBoolean("amqj.shared_read_write_pool"))
+        {
+            session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf);
+        }
+        else
+        {
+            session.getFilterChain().addLast("protocolFilter", pcf);
+        }
+        // we only add the SSL filter where we have an SSL connection
+        if (_useSSL)
+        {
+            //todo FIXME: Bogus context cannot be used in production.
+            SSLFilter sslFilter = new SSLFilter(BogusSSLContextFactory.getInstance(false));
+            sslFilter.setUseClientMode(true);
+            session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
+        }
+
+        _protocolSession = new AMQProtocolSession(this, session, _connection);
+        _protocolSession.init();
+    }
+
+    public void sessionOpened(IoSession session) throws Exception
+    {
+        System.setProperty("foo", "bar");
+    }
+
+    /**
+     * When the broker connection dies we can either get sessionClosed() called or exceptionCaught() followed by
+     * sessionClosed() depending on whether we were trying to send data at the time of failure.
+     *
+     * @param session
+     * @throws Exception
+     */
+    public void sessionClosed(IoSession session) throws Exception
+    {
+        //todo server just closes session with no warning if auth fails.
+        if (_connection.isClosed())
+        {
+            _logger.info("Session closed called by client");
+        }
+        else
+        {
+            _logger.info("Session closed called with failover state currently " + _failoverState);
+
+            //reconnetablility was introduced here so as not to disturb the client as they have made their intentions
+            // known through the policy settings.
+
+            if ((_failoverState != FailoverState.IN_PROGRESS) && _connection.failoverAllowed())
+            {
+                _logger.info("FAILOVER STARTING");
+                if (_failoverState == FailoverState.NOT_STARTED)
+                {
+                    _failoverState = FailoverState.IN_PROGRESS;
+                    startFailoverThread();
+                }
+                else
+                {
+                    _logger.info("Not starting failover as state currently " + _failoverState);
+                }
+            }
+            else
+            {
+                _logger.info("Failover not allowed by policy.");
+
+                if (_failoverState != FailoverState.IN_PROGRESS)
+                {
+                    _logger.info("sessionClose() not allowed to failover");
+                    _connection.exceptionReceived(
+                            new AMQDisconnectedException("Server closed connection and reconnection " +
+                                    "not permitted."));
+                }
+                else
+                {
+                    _logger.info("sessionClose() failover in progress");
+                }
+            }
+        }
+
+        _logger.info("Protocol Session [" + this + "] closed");
+    }
+
+    /**
+     * See {@link FailoverHandler} to see rationale for separate thread.
+     */
+    private void startFailoverThread()
+    {
+        Thread failoverThread = new Thread(_failoverHandler);
+        failoverThread.setName("Failover");
+        // Do not inherit daemon-ness from current thread as this can be a daemon
+        // thread such as a AnonymousIoService thread.
+        failoverThread.setDaemon(false);
+        failoverThread.start();
+    }
+
+    public void sessionIdle(IoSession session, IdleStatus status) throws Exception
+    {
+        _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + status);
+        if (IdleStatus.WRITER_IDLE.equals(status))
+        {
+            //write heartbeat frame:
+            _logger.debug("Sent heartbeat");
+            session.write(HeartbeatBody.FRAME);
+            HeartbeatDiagnostics.sent();
+        }
+        else if (IdleStatus.READER_IDLE.equals(status))
+        {
+            //failover:
+            HeartbeatDiagnostics.timeout();
+            _logger.warn("Timed out while waiting for heartbeat from peer.");
+            session.close();
+        }
+    }
+
+    public void exceptionCaught(IoSession session, Throwable cause) throws Exception
+    {
+        if (_failoverState == FailoverState.NOT_STARTED)
+        {
+            //if (!(cause instanceof AMQUndeliveredException) && (!(cause instanceof AMQAuthenticationException)))
+            if (cause instanceof AMQConnectionClosedException)
+            {
+                _logger.info("Exception caught therefore going to attempt failover: " + cause, cause);
+                          // this will attemp failover
+
+                sessionClosed(session);
+            }
+        }
+        // we reach this point if failover was attempted and failed therefore we need to let the calling app
+        // know since we cannot recover the situation
+        else if (_failoverState == FailoverState.FAILED)
+        {
+            _logger.error("Exception caught by protocol handler: " + cause, cause);
+            // we notify the state manager of the error in case we have any clients waiting on a state
+            // change. Those "waiters" will be interrupted and can handle the exception
+            AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
+            propagateExceptionToWaiters(amqe);
+            _connection.exceptionReceived(cause);
+        }
+    }
+
+    /**
+     * There are two cases where we have other threads potentially blocking for events to be handled by this
+     * class. These are for the state manager (waiting for a state change) or a frame listener (waiting for a
+     * particular type of frame to arrive). When an error occurs we need to notify these waiters so that they can
+     * react appropriately.
+     *
+     * @param e the exception to propagate
+     */
+    public void propagateExceptionToWaiters(Exception e)
+    {
+        _stateManager.error(e);
+        final Iterator it = _frameListeners.iterator();
+        while (it.hasNext())
+        {
+            final AMQMethodListener ml = (AMQMethodListener) it.next();
+            ml.error(e);
+        }
+    }
+
+    private static int _messageReceivedCount;
+
+    public void messageReceived(IoSession session, Object message) throws Exception
+    {
+
+        if (_messageReceivedCount++ % 1000 == 0)
+        {
+            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+        }
+        Iterator it = _frameListeners.iterator();
+        AMQFrame frame = (AMQFrame) message;
+
+        HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+
+        if (frame.bodyFrame instanceof AMQMethodBody)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Method frame received: " + frame);
+            }
+
+            final AMQMethodEvent evt = new AMQMethodEvent(frame.channel, (AMQMethodBody)frame.bodyFrame, _protocolSession);
+            try
+            {
+                boolean wasAnyoneInterested = false;
+                while (it.hasNext())
+                {
+                    final AMQMethodListener listener = (AMQMethodListener) it.next();
+                    wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                }
+                if (!wasAnyoneInterested)
+                {
+                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
+                }
+            }
+            catch (AMQException e)
+            {
+                it = _frameListeners.iterator();
+                while (it.hasNext())
+                {
+                    final AMQMethodListener listener = (AMQMethodListener) it.next();
+                    listener.error(e);
+                }
+                exceptionCaught(session, e);
+            }
+        }
+        else if (frame.bodyFrame instanceof ContentHeaderBody)
+        {
+            _protocolSession.messageContentHeaderReceived(frame.channel,
+                                                          (ContentHeaderBody) frame.bodyFrame);
+        }
+        else if (frame.bodyFrame instanceof ContentBody)
+        {
+            _protocolSession.messageContentBodyReceived(frame.channel,
+                                                        (ContentBody) frame.bodyFrame);
+        }
+        else if (frame.bodyFrame instanceof HeartbeatBody)
+        {
+            _logger.debug("Received heartbeat");
+        }
+        _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+    }
+
+    private static int _messagesOut;
+
+    public void messageSent(IoSession session, Object message) throws Exception
+    {
+        if (_messagesOut++ % 1000 == 0)
+        {
+            _logger.debug("Sent " + _messagesOut + " protocol messages");
+        }
+        _connection.bytesSent(session.getWrittenBytes());
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sent frame " + message);
+        }
+    }
+
+    public void addFrameListener(AMQMethodListener listener)
+    {
+        _frameListeners.add(listener);
+    }
+
+    public void removeFrameListener(AMQMethodListener listener)
+    {
+        _frameListeners.remove(listener);
+    }
+
+    public void attainState(AMQState s) throws AMQException
+    {
+        _stateManager.attainState(s);
+    }
+
+    /**
+     * Convenience method that writes a frame to the protocol session. Equivalent
+     * to calling getProtocolSession().write().
+     *
+     * @param frame the frame to write
+     */
+    public void writeFrame(AMQDataBlock frame)
+    {
+        _protocolSession.writeFrame(frame);
+    }
+
+    public void writeFrame(AMQDataBlock frame, boolean wait)
+    {
+        _protocolSession.writeFrame(frame, wait);
+    }
+
+    /**
+     * Convenience method that writes a frame to the protocol session and waits for
+     * a particular response. Equivalent to calling getProtocolSession().write() then
+     * waiting for the response.
+     *
+     * @param frame
+     * @param listener the blocking listener. Note the calling thread will block.
+     */
+    private AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
+                                                           BlockingMethodFrameListener listener)
+        throws AMQException
+    {
+        _frameListeners.add(listener);
+        _protocolSession.writeFrame(frame);
+        return listener.blockForFrame();
+        // When control resumes before this line, a reply will have been received
+        // that matches the criteria defined in the blocking listener
+    }
+
+    /**
+     * More convenient method to write a frame and wait for it's response.
+     */
+    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
+    {
+        return writeCommandFrameAndWaitForReply(frame,
+            new SpecificMethodFrameListener(frame.channel, responseClass));
+    }
+
+    /**
+     * Convenience method to register an AMQSession with the protocol handler. Registering
+     * a session with the protocol handler will ensure that messages are delivered to the
+     * consumer(s) on that session.
+     *
+     * @param channelId the channel id of the session
+     * @param session the session instance.
+     */
+    public void addSessionByChannel(int channelId, AMQSession session)
+    {
+        _protocolSession.addSessionByChannel(channelId, session);
+    }
+
+    /**
+     * Convenience method to deregister an AMQSession with the protocol handler.
+     *
+     * @param channelId then channel id of the session
+     */
+    public void removeSessionByChannel(int channelId)
+    {
+        _protocolSession.removeSessionByChannel(channelId);
+    }
+
+    public void closeSession(AMQSession session) throws AMQException
+    {
+        _protocolSession.closeSession(session);
+    }
+
+    public void closeConnection() throws AMQException
+    {
+        _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+
+        final AMQFrame frame = ConnectionCloseBody.createAMQFrame(
+                0, AMQConstant.REPLY_SUCCESS.getCode(), "JMS client is closing the connection.", 0, 0);
+        syncWrite(frame, ConnectionCloseOkBody.class);
+
+        _protocolSession.closeProtocolSession();
+    }
+
+    /**
+     * @return the number of bytes read from this protocol session
+     */
+    public long getReadBytes()
+    {
+        return _protocolSession.getIoSession().getReadBytes();
+    }
+
+    /**
+     * @return the number of bytes written to this protocol session
+     */
+    public long getWrittenBytes()
+    {
+        return _protocolSession.getIoSession().getWrittenBytes();
+    }
+
+    public void failover(String host, int port)
+    {
+        _failoverHandler.setHost(host);
+        _failoverHandler.setPort(port);
+        // see javadoc for FailoverHandler to see rationale for separate thread
+        startFailoverThread();
+    }
+
+    public void blockUntilNotFailingOver() throws InterruptedException
+    {
+        if (_failoverLatch != null)
+        {
+            _failoverLatch.await();
+        }
+    }
+
+    public String generateQueueName()
+    {
+        return _protocolSession.generateQueueName();
+    }
+
+    public CountDownLatch getFailoverLatch()
+    {
+        return _failoverLatch;
+    }
+
+    public void setFailoverLatch(CountDownLatch failoverLatch)
+    {
+        _failoverLatch = failoverLatch;
+    }
+
+    public AMQConnection getConnection()
+    {
+        return _connection;
+    }
+
+    public AMQStateManager getStateManager()
+    {
+        return _stateManager;
+    }
+
+    public void setStateManager(AMQStateManager stateManager)
+    {
+        _stateManager = stateManager;
+    }
+
+    FailoverState getFailoverState()
+    {
+        return _failoverState;
+    }
+
+    public void setFailoverState(FailoverState failoverState)
+    {
+        _failoverState = failoverState;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,379 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteFuture;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
+import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersionList;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Wrapper for protocol session that provides type-safe access to session attributes.
+ *
+ * The underlying protocol session is still available but clients should not
+ * use it to obtain session attributes.
+ */
+public class AMQProtocolSession implements ProtocolVersionList
+{
+    private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
+
+    public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
+
+    protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
+
+    private static final String AMQ_CONNECTION = "AMQConnection";
+
+    private static final String SASL_CLIENT = "SASLClient";
+
+    private final IoSession _minaProtocolSession;
+
+    /**
+     * The handler from which this session was created and which is used to handle protocol events.
+     * We send failover events to the handler.
+     */
+    private final AMQProtocolHandler _protocolHandler;
+
+    /**
+     * Maps from the channel id to the AMQSession that it represents.
+     */
+    private ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+
+    private ConcurrentMap _closingChannels = new ConcurrentHashMap();
+
+    /**
+     * Maps from a channel id to an unprocessed message. This is used to tie together the
+     * JmsDeliverBody (which arrives first) with the subsequent content header and content bodies.
+     */
+    private ConcurrentMap _channelId2UnprocessedMsgMap = new ConcurrentHashMap();
+
+    /**
+     * Counter to ensure unique queue names
+     */
+    private int _queueId = 1;
+    private final Object _queueIdLock = new Object();
+
+    public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
+    {
+        _protocolHandler = protocolHandler;
+        _minaProtocolSession = protocolSession;
+        // properties of the connection are made available to the event handlers
+        _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
+    }
+
+    public void init()
+    {
+        // start the process of setting up the connection. This is the first place that
+        // data is written to the server.
+        /* Find last protocol version in protocol version list. Make sure last protocol version
+        listed in the build file (build-module.xml) is the latest version which will be used
+        here. */
+        int i = pv.length - 1;
+        _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+    }
+
+    public String getClientID()
+    {
+        try
+        {
+            return getAMQConnection().getClientID();
+        }
+        catch (JMSException e)
+        {
+            // we never throw a JMSException here
+            return null;
+        }
+    }
+
+    public void setClientID(String clientID) throws JMSException
+    {
+        getAMQConnection().setClientID(clientID);
+    }
+
+    public String getVirtualHost()
+    {
+        return getAMQConnection().getVirtualHost();
+    }
+
+    public String getUsername()
+    {
+        return getAMQConnection().getUsername();
+    }
+
+    public String getPassword()
+    {
+        return getAMQConnection().getPassword();
+    }
+
+    public IoSession getIoSession()
+    {
+        return _minaProtocolSession;
+    }
+
+    public SaslClient getSaslClient()
+    {
+        return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+    }
+
+    /**
+     * Store the SASL client currently being used for the authentication handshake
+     * @param client if non-null, stores this in the session. if null clears any existing client
+     * being stored
+     */
+    public void setSaslClient(SaslClient client)
+    {
+        if (client == null)
+        {
+            _minaProtocolSession.removeAttribute(SASL_CLIENT);
+        }
+        else
+        {
+            _minaProtocolSession.setAttribute(SASL_CLIENT, client);
+        }
+    }
+
+    public ConnectionTuneParameters getConnectionTuneParameters()
+    {
+        return (ConnectionTuneParameters) _minaProtocolSession.getAttribute(CONNECTION_TUNE_PARAMETERS);
+    }
+
+    public void setConnectionTuneParameters(ConnectionTuneParameters params)
+    {
+        _minaProtocolSession.setAttribute(CONNECTION_TUNE_PARAMETERS, params);
+        AMQConnection con = getAMQConnection();
+        con.setMaximumChannelCount(params.getChannelMax());
+        con.setMaximumFrameSize(params.getFrameMax());
+        initHeartbeats((int) params.getHeartbeat());
+    }
+
+    /**
+     * Callback invoked from the BasicDeliverMethodHandler when a message has been received.
+     * This is invoked on the MINA dispatcher thread.
+     * @param message
+     * @throws AMQException if this was not expected
+     */
+    public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
+    {
+        _channelId2UnprocessedMsgMap.put(message.channelId, message);
+    }
+
+    public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader)
+            throws AMQException
+    {
+        UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+        if (msg == null)
+        {
+            throw new AMQException("Error: received content header without having received a BasicDeliver frame first");
+        }
+        if (msg.contentHeader != null)
+        {
+            throw new AMQException("Error: received duplicate content header or did not receive correct number of content body frames");
+        }
+        msg.contentHeader = contentHeader;
+        if (contentHeader.bodySize == 0)
+        {
+            deliverMessageToAMQSession(channelId, msg);
+        }
+    }
+
+    public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+    {
+        UnprocessedMessage msg = (UnprocessedMessage) _channelId2UnprocessedMsgMap.get(channelId);
+        if (msg == null)
+        {
+            throw new AMQException("Error: received content body without having received a JMSDeliver frame first");
+        }
+        if (msg.contentHeader == null)
+        {
+            _channelId2UnprocessedMsgMap.remove(channelId);
+            throw new AMQException("Error: received content body without having received a ContentHeader frame first");
+        }
+        try
+        {
+            msg.receiveBody(contentBody);
+        }
+        catch (UnexpectedBodyReceivedException e)
+        {
+            _channelId2UnprocessedMsgMap.remove(channelId);
+            throw e;
+        }
+        if (msg.isAllBodyDataReceived())
+        {
+            deliverMessageToAMQSession(channelId,  msg);
+        }
+    }
+
+    /**
+     * Deliver a message to the appropriate session, removing the unprocessed message
+     * from our map
+     * @param channelId the channel id the message should be delivered to
+     * @param msg the message
+     */
+    private void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
+    {
+        AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+        session.messageReceived(msg);
+        _channelId2UnprocessedMsgMap.remove(channelId);
+    }
+
+    /**
+     * Convenience method that writes a frame to the protocol session. Equivalent
+     * to calling getProtocolSession().write().
+     *
+     * @param frame the frame to write
+     */
+    public void writeFrame(AMQDataBlock frame)
+    {
+        _minaProtocolSession.write(frame);
+    }
+
+    public void writeFrame(AMQDataBlock frame, boolean wait)
+    {
+        WriteFuture f =_minaProtocolSession.write(frame);
+        if(wait)
+        {
+            f.join();
+        }
+    }
+
+    public void addSessionByChannel(int channelId, AMQSession session)
+    {
+        if (channelId <=0)
+        {
+            throw new IllegalArgumentException("Attempt to register a session with a channel id <= zero");
+        }
+        if (session == null)
+        {
+            throw new IllegalArgumentException("Attempt to register a null session");
+        }
+        _logger.debug("Add session with channel id  " + channelId);
+        _channelId2SessionMap.put(channelId, session);
+    }
+
+    public void removeSessionByChannel(int channelId)
+    {
+        if (channelId <=0)
+        {
+            throw new IllegalArgumentException("Attempt to deregister a session with a channel id <= zero");
+        }
+        _logger.debug("Removing session with channelId " + channelId);
+        _channelId2SessionMap.remove(channelId);
+    }
+
+    /**
+     * Starts the process of closing a session
+     * @param session the AMQSession being closed
+     */
+    public void closeSession(AMQSession session)
+    {
+        _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+        final int channelId = session.getChannelId();
+        if (channelId <=0)
+        {
+            throw new IllegalArgumentException("Attempt to close a channel with id < 0");
+        }
+        // we need to know when a channel is closing so that we can respond
+        // with a channel.close frame when we receive any other type of frame
+        // on that channel
+        _closingChannels.putIfAbsent(channelId, session);
+    }
+
+    /**
+     * Called from the ChannelClose handler when a channel close frame is received.
+     * This method decides whether this is a response or an initiation. The latter
+     * case causes the AMQSession to be closed and an exception to be thrown if
+     * appropriate.
+     * @param channelId the id of the channel (session)
+     * @return true if the client must respond to the server, i.e. if the server
+     * initiated the channel close, false if the channel close is just the server
+     * responding to the client's earlier request to close the channel.
+     */
+    public boolean channelClosed(int channelId, int code, String text)
+    {
+        final Integer chId = channelId;
+        // if this is not a response to an earlier request to close the channel
+        if (_closingChannels.remove(chId) == null)
+        {
+            final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+            session.closed(new AMQException(_logger, code, text));
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    public AMQConnection getAMQConnection()
+    {
+        return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
+    }
+
+    public void closeProtocolSession()
+    {
+        _logger.debug("Closing protocol session");
+        final CloseFuture future = _minaProtocolSession.close();
+        future.join();
+    }
+
+    public void failover(String host, int port)
+    {
+        _protocolHandler.failover(host, port);
+    }
+
+    String generateQueueName()
+    {
+        int id;
+        synchronized(_queueIdLock)
+        {
+            id = _queueId++;
+        }
+        //todo remove '/' and ':' from local Address as this doesn't conform to spec.        
+        return "tmp_" + _minaProtocolSession.getLocalAddress() + "_" + id;
+    }
+
+    /**
+     *
+     * @param delay delay in seconds (not ms)
+     */
+    void initHeartbeats(int delay)
+    {
+        if (delay > 0)
+        {
+            _minaProtocolSession.setIdleTime(IdleStatus.WRITER_IDLE, delay);
+            _minaProtocolSession.setIdleTime(IdleStatus.READER_IDLE, HeartbeatConfig.CONFIG.getTimeout(delay));
+            HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/AMQProtocolSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,133 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+
+public abstract class BlockingMethodFrameListener implements AMQMethodListener
+{
+    private volatile boolean _ready = false;
+
+    public abstract boolean processMethod(int channelId, AMQMethodBody frame) throws AMQException;
+
+    private final Object _lock = new Object();
+
+    /**
+     * This is set if there is an exception thrown from processCommandFrame and the
+     * exception is rethrown to the caller of blockForFrame()
+     */
+    private volatile Exception _error;
+
+    protected int _channelId;
+
+    protected AMQMethodEvent _doneEvt = null;
+
+    public BlockingMethodFrameListener(int channelId)
+    {
+        _channelId = channelId;
+    }
+
+    /**
+     * This method is called by the MINA dispatching thread. Note that it could
+     * be called before blockForFrame() has been called.
+     * @param evt the frame event
+     * @return true if the listener has dealt with this frame
+     * @throws AMQException
+     */
+    public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+    {
+        AMQMethodBody method = evt.getMethod();
+
+        try
+        {
+            boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+            if (ready)
+            {
+                // we only update the flag from inside the synchronized block
+                // so that the blockForFrame method cannot "miss" an update - it
+                // will only ever read the flag from within the synchronized block
+                synchronized (_lock)
+                {
+                    _doneEvt = evt;
+                    _ready = ready;
+                    _lock.notify();
+                }
+            }
+            return ready;
+        }
+        catch (AMQException e)
+        {
+            error(e);
+            // we rethrow the error here, and the code in the frame dispatcher will go round
+            // each listener informing them that an exception has been thrown
+            throw e;
+        }
+    }
+
+    /**
+     * This method is called by the thread that wants to wait for a frame.
+     */
+    public AMQMethodEvent blockForFrame() throws AMQException
+    {
+        synchronized (_lock)
+        {
+            while (!_ready)
+            {
+                try
+                {
+                    _lock.wait();
+                }
+                catch (InterruptedException e)
+                {
+                    // IGNORE
+                }
+            }
+        }
+        if (_error != null)
+        {
+            if (_error instanceof AMQException)
+            {
+                throw (AMQException)_error;
+            }
+            else
+            {
+                throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+            }
+        }
+
+        return _doneEvt;
+    }
+
+    /**
+     * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
+     * class to avoid code repetition but again is only called by the MINA dispatcher thread.
+     * @param e
+     */
+    public void error(Exception e)
+    {
+        // set the error so that the thread that is blocking (against blockForFrame())
+        // can pick up the exception and rethrow to the caller
+        _error = e;
+        synchronized (_lock)
+        {
+            _ready = true;
+            _lock.notify();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,57 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+import org.apache.log4j.Logger;
+
+class HeartbeatConfig
+{
+    private static final Logger _logger = Logger.getLogger(HeartbeatConfig.class);
+    static final HeartbeatConfig CONFIG = new HeartbeatConfig();
+
+    /**
+     * The factor used to get the timeout from the delay between heartbeats.
+     */
+    private float timeoutFactor = 2;
+
+    HeartbeatConfig()
+    {
+        String property = System.getProperty("amqj.heartbeat.timeoutFactor");
+        if(property != null)
+        {
+            try
+            {
+                timeoutFactor = Float.parseFloat(property);
+            }
+            catch(NumberFormatException e)
+            {
+                _logger.warn("Invalid timeout factor (amqj.heartbeat.timeoutFactor): " + property);
+            }
+        }
+    }
+
+    float getTimeoutFactor()
+    {
+        return timeoutFactor;
+    }
+
+    int getTimeout(int writeDelay)
+    {
+        return (int) (timeoutFactor * writeDelay);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatConfig.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.client.protocol;
+
+class HeartbeatDiagnostics
+{
+    private static final Diagnostics _impl = init();
+
+    private static Diagnostics init()
+    {
+        return Boolean.getBoolean("amqj.heartbeat.diagnostics") ? new On() : new Off();
+    }
+
+    static void sent()
+    {
+        _impl.sent();
+    }
+
+    static void timeout()
+    {
+        _impl.timeout();
+    }
+
+    static void received(boolean heartbeat)
+    {
+        _impl.received(heartbeat);
+    }
+
+    static void init(int delay, int timeout)
+    {
+        _impl.init(delay, timeout);
+    }
+
+    private static interface Diagnostics
+    {
+        void sent();
+        void timeout();
+        void received(boolean heartbeat);
+        void init(int delay, int timeout);
+    }
+
+    private static class On implements Diagnostics
+    {
+        private final String[] messages = new String[50];
+        private int i;
+
+        private void save(String msg)
+        {
+            messages[i++] = msg;
+            if(i >= messages.length){
+                i = 0;//i.e. a circular buffer
+            }
+        }
+
+        public void sent()
+        {
+            save(System.currentTimeMillis() + ": sent heartbeat");
+        }
+
+        public void timeout()
+        {
+            for(int i = 0; i < messages.length; i++)
+            {
+                if(messages[i] != null)
+                {
+                    System.out.println(messages[i]);
+                }
+            }
+            System.out.println(System.currentTimeMillis() + ": timed out");
+        }
+
+        public void received(boolean heartbeat)
+        {
+            save(System.currentTimeMillis() + ": received " + (heartbeat ? "heartbeat" : "data"));
+        }
+
+        public void init(int delay, int timeout)
+        {
+            System.out.println(System.currentTimeMillis() + ": initialised delay=" + delay + ", timeout=" + timeout);
+        }
+    }
+
+    private static class Off implements Diagnostics
+    {
+        public void sent()
+        {
+
+        }
+        public void timeout()
+        {
+
+        }
+        public void received(boolean heartbeat)
+        {
+
+        }
+
+        public void init(int delay, int timeout)
+        {
+
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/org/apache/qpid/client/protocol/HeartbeatDiagnostics.java
------------------------------------------------------------------------------
    svn:eol-style = native