You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/07/27 18:10:32 UTC

svn commit: r560296 [3/3] - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient: ./ api/ jms/ jms/message/

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,507 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import java.nio.charset.CharacterCodingException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
+{
+    private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
+
+    public static final String MIME_TYPE = "jms/map-message";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+    private Map<String, Object> _map = new HashMap<String, Object>();
+
+    public JMSMapMessage() throws JMSException
+    {
+        this(null);
+    }
+
+    JMSMapMessage(ByteBuffer data) throws JMSException
+    {
+        super(data); // this instantiates a content header
+        populateMapFromData();
+    }
+
+    JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+        ByteBuffer data) throws AMQException
+    {
+        super(messageNbr, contentHeader, exchange, routingKey, data);
+        try
+        {
+            populateMapFromData();
+        }
+        catch (JMSException je)
+        {
+            throw new AMQException(null, "Error populating MapMessage from ByteBuffer", je);
+
+        }
+
+    }
+
+    public String toBodyString() throws JMSException
+    {
+        return _map.toString();
+    }
+
+    public AMQShortString getMimeTypeAsShortString()
+    {
+        return MIME_TYPE_SHORT_STRING;
+    }
+
+    public ByteBuffer getData()
+    {
+        // What if _data is null?
+        writeMapToData();
+
+        return super.getData();
+    }
+
+    @Override
+    public void clearBodyImpl() throws JMSException
+    {
+        super.clearBodyImpl();
+        _map.clear();
+    }
+
+    public boolean getBoolean(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Boolean)
+        {
+            return ((Boolean) value).booleanValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Boolean.valueOf((String) value);
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to boolean.");
+        }
+
+    }
+
+    public byte getByte(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Byte)
+        {
+            return ((Byte) value).byteValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Byte.valueOf((String) value).byteValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to byte.");
+        }
+    }
+
+    public short getShort(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Short)
+        {
+            return ((Short) value).shortValue();
+        }
+        else if (value instanceof Byte)
+        {
+            return ((Byte) value).shortValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Short.valueOf((String) value).shortValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to short.");
+        }
+
+    }
+
+    public int getInt(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Integer)
+        {
+            return ((Integer) value).intValue();
+        }
+        else if (value instanceof Short)
+        {
+            return ((Short) value).intValue();
+        }
+        else if (value instanceof Byte)
+        {
+            return ((Byte) value).intValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Integer.valueOf((String) value).intValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to int.");
+        }
+
+    }
+
+    public long getLong(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Long)
+        {
+            return ((Long) value).longValue();
+        }
+        else if (value instanceof Integer)
+        {
+            return ((Integer) value).longValue();
+        }
+
+        if (value instanceof Short)
+        {
+            return ((Short) value).longValue();
+        }
+
+        if (value instanceof Byte)
+        {
+            return ((Byte) value).longValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Long.valueOf((String) value).longValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to long.");
+        }
+
+    }
+
+    public char getChar(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (!_map.containsKey(propName))
+        {
+            throw new MessageFormatException("Property " + propName + " not present");
+        }
+        else if (value instanceof Character)
+        {
+            return ((Character) value).charValue();
+        }
+        else if (value == null)
+        {
+            throw new NullPointerException("Property " + propName + " has null value and therefore cannot "
+                + "be converted to char.");
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to boolan.");
+        }
+
+    }
+
+    public float getFloat(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Float)
+        {
+            return ((Float) value).floatValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Float.valueOf((String) value).floatValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to float.");
+        }
+    }
+
+    public double getDouble(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (value instanceof Double)
+        {
+            return ((Double) value).doubleValue();
+        }
+        else if (value instanceof Float)
+        {
+            return ((Float) value).doubleValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Double.valueOf((String) value).doubleValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to double.");
+        }
+    }
+
+    public String getString(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if ((value instanceof String) || (value == null))
+        {
+            return (String) value;
+        }
+        else if (value instanceof byte[])
+        {
+            throw new MessageFormatException("Property " + propName + " of type byte[] " + "cannot be converted to String.");
+        }
+        else
+        {
+            return value.toString();
+        }
+
+    }
+
+    public byte[] getBytes(String propName) throws JMSException
+    {
+        Object value = _map.get(propName);
+
+        if (!_map.containsKey(propName))
+        {
+            throw new MessageFormatException("Property " + propName + " not present");
+        }
+        else if ((value instanceof byte[]) || (value == null))
+        {
+            return (byte[]) value;
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + propName + " of type " + value.getClass().getName()
+                + " cannot be converted to byte[].");
+        }
+    }
+
+    public Object getObject(String propName) throws JMSException
+    {
+        return _map.get(propName);
+    }
+
+    public Enumeration getMapNames() throws JMSException
+    {
+        return Collections.enumeration(_map.keySet());
+    }
+
+    public void setBoolean(String propName, boolean b) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, b);
+    }
+
+    public void setByte(String propName, byte b) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, b);
+    }
+
+    public void setShort(String propName, short i) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, i);
+    }
+
+    public void setChar(String propName, char c) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, c);
+    }
+
+    public void setInt(String propName, int i) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, i);
+    }
+
+    public void setLong(String propName, long l) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, l);
+    }
+
+    public void setFloat(String propName, float v) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, v);
+    }
+
+    public void setDouble(String propName, double v) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, v);
+    }
+
+    public void setString(String propName, String string1) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, string1);
+    }
+
+    public void setBytes(String propName, byte[] bytes) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        _map.put(propName, bytes);
+    }
+
+    public void setBytes(String propName, byte[] bytes, int offset, int length) throws JMSException
+    {
+        if ((offset == 0) && (length == bytes.length))
+        {
+            setBytes(propName, bytes);
+        }
+        else
+        {
+            byte[] newBytes = new byte[length];
+            System.arraycopy(bytes, offset, newBytes, 0, length);
+            setBytes(propName, newBytes);
+        }
+    }
+
+    public void setObject(String propName, Object value) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(propName);
+        if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+                || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+                || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
+        {
+            _map.put(propName, value);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot set property " + propName + " to value " + value + "of type "
+                + value.getClass().getName() + ".");
+        }
+    }
+
+    private void checkPropertyName(String propName)
+    {
+        if ((propName == null) || propName.equals(""))
+        {
+            throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+        }
+    }
+
+    public boolean itemExists(String propName) throws JMSException
+    {
+        return _map.containsKey(propName);
+    }
+
+    private void populateMapFromData() throws JMSException
+    {
+        if (_data != null)
+        {
+            _data.rewind();
+
+            final int entries = readIntImpl();
+            for (int i = 0; i < entries; i++)
+            {
+                String propName = readStringImpl();
+                Object value = readObject();
+                _map.put(propName, value);
+            }
+        }
+        else
+        {
+            _map.clear();
+        }
+    }
+
+    private void writeMapToData()
+    {
+        allocateInitialBuffer();
+        final int size = _map.size();
+        writeIntImpl(size);
+        for (Map.Entry<String, Object> entry : _map.entrySet())
+        {
+            try
+            {
+                writeStringImpl(entry.getKey());
+            }
+            catch (CharacterCodingException e)
+            {
+                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
+
+            }
+
+            try
+            {
+                writeObject(entry.getValue());
+            }
+            catch (JMSException e)
+            {
+                Object value = entry.getValue();
+                throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
+                    + " (type: " + value.getClass().getName() + ").", e);
+            }
+        }
+
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ *
+ * 
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSMapMessageFactory extends AbstractJMSMessageFactory
+{
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSMapMessage();
+    }
+
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey, 
+                                               ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSMapMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,197 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import javax.jms.ObjectMessage;
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
+{
+    public static final String MIME_TYPE = "application/java-object-stream";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+    private static final int DEFAULT_BUFFER_SIZE = 1024;
+
+    /**
+     * Creates empty, writable message for use by producers
+     */
+    public JMSObjectMessage()
+    {
+        this(null);
+    }
+
+    private JMSObjectMessage(ByteBuffer data)
+    {
+        super(data);
+        if (data == null)
+        {
+            _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+            _data.setAutoExpand(true);
+        }
+
+        getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+    }
+
+    /**
+     * Creates read only message for delivery to consumers
+     */
+    JMSObjectMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange, AMQShortString routingKey,
+        ByteBuffer data) throws AMQException
+    {
+        super(messageNbr, (BasicContentHeaderProperties) contentHeader.properties, exchange, routingKey, data);
+    }
+
+    public void clearBodyImpl() throws JMSException
+    {
+        if (_data != null)
+        {
+            _data.release();
+        }
+
+        _data = null;
+
+    }
+
+    public String toBodyString() throws JMSException
+    {
+        return toString(_data);
+    }
+
+    public AMQShortString getMimeTypeAsShortString()
+    {
+        return MIME_TYPE_SHORT_STRING;
+    }
+
+    public void setObject(Serializable serializable) throws JMSException
+    {
+        checkWritable();
+
+        if (_data == null)
+        {
+            _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+            _data.setAutoExpand(true);
+        }
+        else
+        {
+            _data.rewind();
+        }
+
+        try
+        {
+            ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
+            out.writeObject(serializable);
+            out.flush();
+            out.close();
+        }
+        catch (IOException e)
+        {
+            MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
+            mfe.setLinkedException(e);
+            throw mfe;
+        }
+
+    }
+
+    public Serializable getObject() throws JMSException
+    {
+        ObjectInputStream in = null;
+        if (_data == null)
+        {
+            return null;
+        }
+
+        try
+        {
+            _data.rewind();
+            in = new ObjectInputStream(_data.asInputStream());
+
+            return (Serializable) in.readObject();
+        }
+        catch (IOException e)
+        {
+            MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+            mfe.setLinkedException(e);
+            throw mfe;
+        }
+        catch (ClassNotFoundException e)
+        {
+            MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
+            mfe.setLinkedException(e);
+            throw mfe;
+        }
+        finally
+        {
+            _data.rewind();
+            close(in);
+        }
+    }
+
+    private static void close(InputStream in)
+    {
+        try
+        {
+            if (in != null)
+            {
+                in.close();
+            }
+        }
+        catch (IOException ignore)
+        { }
+    }
+
+    private static String toString(ByteBuffer data)
+    {
+        if (data == null)
+        {
+            return null;
+        }
+
+        int pos = data.position();
+        try
+        {
+            return data.getString(Charset.forName("UTF8").newDecoder());
+        }
+        catch (CharacterCodingException e)
+        {
+            return null;
+        }
+        finally
+        {
+            data.position(pos);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
+{
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey, 
+                                               ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+    }
+
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSObjectMessage();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSObjectMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
+{
+    public static final String MIME_TYPE="jms/stream-message";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+
+    /**
+     * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
+     * a byte array in multiple chunks, hence this is used to track how much is left to be read
+     */
+    private int _byteArrayRemaining = -1;
+
+    public JMSStreamMessage()
+    {
+        this(null);
+    }
+
+    /**
+     * Construct a stream message with existing data.
+     *
+     * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
+     *             set to auto expand
+     */
+    JMSStreamMessage(ByteBuffer data)
+    {
+        super(data); // this instanties a content header
+    }
+
+
+    JMSStreamMessage(long messageNbr, ContentHeaderBody contentHeader, AMQShortString exchange,
+                     AMQShortString routingKey, ByteBuffer data) throws AMQException
+    {
+        super(messageNbr, contentHeader, exchange, routingKey, data);
+    }
+
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
+    public AMQShortString getMimeTypeAsShortString()
+    {
+        return MIME_TYPE_SHORT_STRING;
+    }
+
+
+
+    public boolean readBoolean() throws JMSException
+    {
+        return super.readBoolean();
+    }
+
+
+    public byte readByte() throws JMSException
+    {
+        return super.readByte();
+    }
+
+    public short readShort() throws JMSException
+    {
+        return super.readShort();
+    }
+
+    /**
+     * Note that this method reads a unicode character as two bytes from the stream
+     *
+     * @return the character read from the stream
+     * @throws JMSException
+     */
+    public char readChar() throws JMSException
+    {
+        return super.readChar();
+    }
+
+    public int readInt() throws JMSException
+    {
+        return super.readInt();
+    }
+
+    public long readLong() throws JMSException
+    {
+        return super.readLong();
+    }
+
+    public float readFloat() throws JMSException
+    {
+        return super.readFloat();
+    }
+
+    public double readDouble() throws JMSException
+    {
+        return super.readDouble();
+    }
+
+    public String readString() throws JMSException
+    {
+        return super.readString();
+    }
+
+    public int readBytes(byte[] bytes) throws JMSException
+    {
+        return super.readBytes(bytes);
+    }
+
+
+    public Object readObject() throws JMSException
+    {
+        return super.readObject();
+    }
+
+    public void writeBoolean(boolean b) throws JMSException
+    {
+        super.writeBoolean(b);
+    }
+
+    public void writeByte(byte b) throws JMSException
+    {
+        super.writeByte(b);
+    }
+
+    public void writeShort(short i) throws JMSException
+    {
+        super.writeShort(i);
+    }
+
+    public void writeChar(char c) throws JMSException
+    {
+        super.writeChar(c);
+    }
+
+    public void writeInt(int i) throws JMSException
+    {
+        super.writeInt(i);
+    }
+
+    public void writeLong(long l) throws JMSException
+    {
+        super.writeLong(l);
+    }
+
+    public void writeFloat(float v) throws JMSException
+    {
+        super.writeFloat(v);
+    }
+
+    public void writeDouble(double v) throws JMSException
+    {
+        super.writeDouble(v);
+    }
+
+    public void writeString(String string) throws JMSException
+    {
+        super.writeString(string);
+    }
+
+    public void writeBytes(byte[] bytes) throws JMSException
+    {
+        super.writeBytes(bytes);
+    }
+
+    public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+    {
+        super.writeBytes(bytes,offset,length);
+    }
+
+    public void writeObject(Object object) throws JMSException
+    {
+        super.writeObject(object);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
+{
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey,
+                                               ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+    }
+
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSStreamMessage();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSStreamMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+
+public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
+{
+    private static final String MIME_TYPE = "text/plain";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
+
+    private String _decodedValue;
+
+    /**
+     * This constant represents the name of a property that is set when the message payload is null.
+     */
+    private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
+    private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
+
+    public JMSTextMessage() throws JMSException
+    {
+        this(null, null);
+    }
+
+    JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+    {
+        super(data); // this instantiates a content header
+        getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+        getContentHeaderProperties().setEncoding(encoding);
+    }
+
+    JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
+                   AMQShortString routingKey, ByteBuffer data)
+            throws AMQException
+    {
+        super(deliveryTag, contentHeader, exchange, routingKey, data);
+        contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
+        _data = data;
+    }
+
+    JMSTextMessage(ByteBuffer data) throws JMSException
+    {
+        this(data, null);
+    }
+
+    JMSTextMessage(String text) throws JMSException
+    {
+        super((ByteBuffer) null);
+        setText(text);
+    }
+
+    public void clearBodyImpl() throws JMSException
+    {
+        if (_data != null)
+        {
+            _data.release();
+        }
+        _data = null;
+        _decodedValue = null;
+    }
+
+    public String toBodyString() throws JMSException
+    {
+        return getText();
+    }
+
+    public void setData(ByteBuffer data)
+    {
+        _data = data;
+    }
+
+    public AMQShortString getMimeTypeAsShortString()
+    {
+        return MIME_TYPE_SHORT_STRING;
+    }
+
+    public void setText(String text) throws JMSException
+    {
+        checkWritable();
+
+        clearBody();
+        try
+        {
+            if (text != null)
+            {                
+                _data = ByteBuffer.allocate(text.length());
+                _data.limit(text.length()) ;
+                //_data.sweep();
+                _data.setAutoExpand(true);
+                final String encoding = getContentHeaderProperties().getEncodingAsString();
+                if (encoding == null)
+                {
+                    _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+                }
+                else
+                {
+                    _data.put(text.getBytes(encoding));
+                }
+                _changedData=true;
+            }
+            _decodedValue = text;
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            // should never occur
+            JMSException jmse = new JMSException("Unable to decode text data");
+            jmse.setLinkedException(e);
+        }
+    }
+
+    public String getText() throws JMSException
+    {
+        if (_data == null && _decodedValue == null)
+        {
+            return null;
+        }
+        else if (_decodedValue != null)
+        {
+            return _decodedValue;
+        }
+        else
+        {
+            _data.rewind();
+
+            if (propertyExists(PAYLOAD_NULL_PROPERTY) && getBooleanProperty(PAYLOAD_NULL_PROPERTY))
+            {
+                return null;
+            }
+            if (getContentHeaderProperties().getEncodingAsString() != null)
+            {
+                try
+                {
+                    _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
+                }
+                catch (CharacterCodingException e)
+                {
+                    JMSException je = new JMSException("Could not decode string data: " + e);
+                    je.setLinkedException(e);
+                    throw je;
+                }
+            }
+            else
+            {
+                try
+                {
+                    _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
+                }
+                catch (CharacterCodingException e)
+                {
+                    JMSException je = new JMSException("Could not decode string data: " + e);
+                    je.setLinkedException(e);
+                    throw je;
+                }
+            }
+            return _decodedValue;
+        }
+    }
+
+    @Override
+    public void prepareForSending() throws JMSException
+    {
+        super.prepareForSending();
+        if (_data == null)
+        {
+            setBooleanProperty(PAYLOAD_NULL_PROPERTY, true);
+        }
+        else
+        {
+            removeProperty(PAYLOAD_NULL_PROPERTY);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class JMSTextMessageFactory extends AbstractJMSMessageFactory
+{
+
+    public AbstractJMSMessage createMessage() throws JMSException
+    {
+        return new JMSTextMessage();
+    }
+
+    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
+                                               AMQShortString exchange, AMQShortString routingKey, 
+                                               ContentHeaderBody contentHeader) throws AMQException
+    {
+        return new JMSTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.properties, 
+                                  exchange, routingKey, data);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/JMSTextMessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,202 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageEOFException;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import java.util.Enumeration;
+
+public class MessageConverter
+{
+
+    /**
+     * Log4J logger
+     */
+    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * AbstractJMSMessage which will hold the converted message
+     */
+    private AbstractJMSMessage _newMessage;
+
+    public MessageConverter(AbstractJMSMessage message) throws JMSException
+    {
+        _newMessage = message;
+    }
+
+    public MessageConverter(BytesMessage message) throws JMSException
+    {
+        BytesMessage bytesMessage = (BytesMessage) message;
+        bytesMessage.reset();
+
+        JMSBytesMessage nativeMsg = new JMSBytesMessage();
+
+        byte[] buf = new byte[1024];
+
+        int len;
+
+        while ((len = bytesMessage.readBytes(buf)) != -1)
+        {
+            nativeMsg.writeBytes(buf, 0, len);
+        }
+
+        _newMessage = nativeMsg;
+        setMessageProperties(message);
+    }
+
+    public MessageConverter(MapMessage message) throws JMSException
+    {
+        MapMessage nativeMessage = new JMSMapMessage();
+
+        Enumeration mapNames = message.getMapNames();
+        while (mapNames.hasMoreElements())
+        {
+            String name = (String) mapNames.nextElement();
+            nativeMessage.setObject(name, message.getObject(name));
+        }
+
+        _newMessage = (AbstractJMSMessage) nativeMessage;
+        setMessageProperties(message);
+    }
+
+    public MessageConverter(ObjectMessage message) throws JMSException
+    {
+        ObjectMessage origMessage = (ObjectMessage) message;
+        ObjectMessage nativeMessage = new JMSObjectMessage();
+
+        nativeMessage.setObject(origMessage.getObject());
+
+        _newMessage = (AbstractJMSMessage) nativeMessage;
+        setMessageProperties(message);
+
+    }
+
+    public MessageConverter(TextMessage message) throws JMSException
+    {
+        TextMessage nativeMessage = new JMSTextMessage();
+
+        nativeMessage.setText(message.getText());
+
+        _newMessage = (AbstractJMSMessage) nativeMessage;
+        setMessageProperties(message);
+    }
+
+    public MessageConverter(StreamMessage message) throws JMSException
+    {
+        StreamMessage nativeMessage = new JMSStreamMessage();
+
+        try
+        {
+            message.reset();
+            while (true)
+            {
+                nativeMessage.writeObject(message.readObject());
+            }
+        }
+        catch (MessageEOFException e)
+        {
+            // we're at the end so don't mind the exception
+        }
+
+        _newMessage = (AbstractJMSMessage) nativeMessage;
+        setMessageProperties(message);
+    }
+
+    public MessageConverter(Message message) throws JMSException
+    {
+        // Send a message with just properties.
+        // Throwing away content
+        BytesMessage nativeMessage = new JMSBytesMessage();
+
+        _newMessage = (AbstractJMSMessage) nativeMessage;
+        setMessageProperties(message);
+    }
+
+    public AbstractJMSMessage getConvertedMessage()
+    {
+        return _newMessage;
+    }
+
+    /**
+     * Sets all message properties
+     */
+    protected void setMessageProperties(Message message) throws JMSException
+    {
+        setNonJMSProperties(message);
+        setJMSProperties(message);
+    }
+
+    /**
+     * Sets all non-JMS defined properties on converted message
+     */
+    protected void setNonJMSProperties(Message message) throws JMSException
+    {
+        Enumeration propertyNames = message.getPropertyNames();
+        while (propertyNames.hasMoreElements())
+        {
+            String propertyName = String.valueOf(propertyNames.nextElement());
+            // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+            if (!propertyName.startsWith("JMSX_"))
+            {
+                Object value = message.getObjectProperty(propertyName);
+                _newMessage.setObjectProperty(propertyName, value);
+            }
+        }
+    }
+
+    /**
+     * Exposed JMS defined properties on converted message:
+     * JMSDestination   - we don't set here
+     * JMSDeliveryMode  - set
+     * JMSExpiration    - we don't set here
+     * JMSPriority      - we don't set here
+     * JMSMessageID     - we don't set here
+     * JMSTimestamp     - we don't set here
+     * JMSCorrelationID - set
+     * JMSReplyTo       - set
+     * JMSType          - set
+     * JMSRedlivered    - we don't set here
+     */
+    protected void setJMSProperties(Message message) throws JMSException
+    {
+        _newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+        if (message.getJMSReplyTo() != null)
+        {
+            _newMessage.setJMSReplyTo(message.getJMSReplyTo());
+        }
+
+        _newMessage.setJMSType(message.getJMSType());
+
+        _newMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+
+public interface MessageFactory
+{
+    AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
+                                     ContentHeaderBody contentHeader,
+                                     AMQShortString exchange, AMQShortString routingKey,
+                                     List bodies)
+        throws JMSException, AMQException;
+
+    AbstractJMSMessage createMessage() throws JMSException;
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+public class MessageFactoryRegistry
+{
+    private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
+    private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap =
+        new HashMap<AMQShortString, MessageFactory>();
+
+    /**
+     * Construct a new registry with the default message factories registered
+     * @return a message factory registry
+     */
+    public static MessageFactoryRegistry newDefaultRegistry()
+    {
+        MessageFactoryRegistry mf = new MessageFactoryRegistry();
+        mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
+        mf.registerFactory("text/plain", new JMSTextMessageFactory());
+        mf.registerFactory("text/xml", new JMSTextMessageFactory());
+        mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
+        mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
+        mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+        mf.registerFactory(null, new JMSBytesMessageFactory());
+
+        return mf;
+    }
+
+    public void registerFactory(String mimeType, MessageFactory mf)
+    {
+        if (mf == null)
+        {
+            throw new IllegalArgumentException("Message factory must not be null");
+        }
+
+        _mimeStringToFactoryMap.put(mimeType, mf);
+        _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
+    }
+
+    public MessageFactory deregisterFactory(String mimeType)
+    {
+        _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+
+        return _mimeStringToFactoryMap.remove(mimeType);
+    }
+
+    /**
+     * Create a message. This looks up the MIME type from the content header and instantiates the appropriate
+     * concrete message type.
+     * @param deliveryTag the AMQ message id
+     * @param redelivered true if redelivered
+     * @param contentHeader the content header that was received
+     * @param bodies a list of ContentBody instances
+     * @return the message.
+     * @throws AMQException
+     * @throws JMSException
+     */
+    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
+                                            AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
+                                     throws AMQException, JMSException
+    {
+        BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+
+        // Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
+        // AMQP. When the type is null, it can only be assumed that the message is a byte message.
+        AMQShortString contentTypeShortString = properties.getContentType();
+        contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE)
+                                                                  : contentTypeShortString;
+
+        MessageFactory mf = _mimeShortStringToFactoryMap.get(contentTypeShortString);
+        if (mf == null)
+        {
+            throw new AMQException(null, "Unsupport MIME type of " + properties.getContentTypeAsString(), null);
+        }
+        else
+        {
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies);
+        }
+    }
+
+    public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+    {
+        if (mimeType == null)
+        {
+            throw new IllegalArgumentException("Mime type must not be null");
+        }
+
+        MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
+        if (mf == null)
+        {
+            throw new AMQException(null, "Unsupport MIME type of " + mimeType, null);
+        }
+        else
+        {
+            return mf.createMessage();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/MessageFactoryRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,135 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.framing.ContentHeaderProperties;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+import java.math.BigDecimal;
+
+public class QpidMessage
+{
+    protected ContentHeaderProperties _contentHeaderProperties;
+
+    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
+    protected AMQSession _session;
+
+    protected final long _deliveryTag;
+
+    public QpidMessage(ContentHeaderProperties properties, long deliveryTag)
+    {
+        _contentHeaderProperties = properties;
+        _deliveryTag = deliveryTag;
+    }
+
+    public QpidMessage(ContentHeaderProperties properties)
+    {
+        this(properties, -1);
+    }
+
+    /**
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+     * acknowledge()
+     *
+     * @param s the AMQ session that delivered this message
+     */
+    public void setAMQSession(AMQSession s)
+    {
+        _session = s;
+    }
+
+    public AMQSession getAMQSession()
+    {
+        return _session;
+    }
+
+    /**
+     * Get the AMQ message number assigned to this message
+     *
+     * @return the message number
+     */
+    public long getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+
+    /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
+    public void prepareForSending() throws JMSException
+    {
+    }
+
+    public FieldTable getPropertyHeaders()
+    {
+        return ((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders();
+    }
+
+    public void setDecimalProperty(AMQShortString propertyName, BigDecimal bd) throws JMSException
+    {
+        getPropertyHeaders().setDecimal(propertyName, bd);
+    }
+
+    public void setIntProperty(AMQShortString propertyName, int i) throws JMSException
+    {
+        getPropertyHeaders().setInteger(propertyName, new Integer(i));
+    }
+
+    public void setLongStringProperty(AMQShortString propertyName, String value)
+    {
+        getPropertyHeaders().setString(propertyName, value);
+    }
+
+    public void setTimestampProperty(AMQShortString propertyName, long value)
+    {
+        getPropertyHeaders().setTimestamp(propertyName, value);
+    }
+
+    public void setVoidProperty(AMQShortString propertyName)
+    {
+        getPropertyHeaders().setVoid(propertyName);
+    }
+
+    //** Getters
+
+    public BigDecimal getDecimalProperty(AMQShortString propertyName) throws JMSException
+    {
+        return getPropertyHeaders().getDecimal(propertyName);
+    }
+
+    public int getIntegerProperty(AMQShortString propertyName) throws JMSException
+    {
+        return getPropertyHeaders().getInteger(propertyName);
+    }
+
+    public String getLongStringProperty(AMQShortString propertyName)
+    {
+        return getPropertyHeaders().getString(propertyName);
+    }
+
+    public Long getTimestampProperty(AMQShortString propertyName)
+    {
+        return getPropertyHeaders().getTimestamp(propertyName);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/QpidMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java?view=auto&rev=560296
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java Fri Jul 27 09:10:27 2007
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package  org.apache.qpid.nclient.jms.message;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
+ *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
+ */
+public class UnprocessedMessage
+{
+    private long _bytesReceived = 0;
+
+    private final BasicDeliverBody _deliverBody;
+    private final BasicReturnBody _bounceBody; // TODO: check change (gustavo)
+    private final int _channelId;
+    private ContentHeaderBody _contentHeader;
+
+    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
+    private List<ContentBody> _bodies;
+
+    public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
+    {
+        _deliverBody = deliverBody;
+        _channelId = channelId;
+        _bounceBody = null;
+    }
+
+
+    public UnprocessedMessage(int channelId, BasicReturnBody bounceBody)
+    {
+        _deliverBody = null;
+        _channelId = channelId;
+        _bounceBody = bounceBody;
+    }
+
+    public void receiveBody(ContentBody body) //throws UnexpectedBodyReceivedException
+    {
+
+        if (body.payload != null)
+        {
+            final long payloadSize = body.payload.remaining();
+
+            if (_bodies == null)
+            {
+                if (payloadSize == getContentHeader().bodySize)
+                {
+                    _bodies = Collections.singletonList(body);
+                }
+                else
+                {
+                    _bodies = new ArrayList<ContentBody>();
+                    _bodies.add(body);
+                }
+
+            }
+            else
+            {
+                _bodies.add(body);
+            }
+            _bytesReceived += payloadSize;
+        }
+    }
+
+    public boolean isAllBodyDataReceived()
+    {
+        return _bytesReceived == getContentHeader().bodySize;
+    }
+
+    public BasicDeliverBody getDeliverBody()
+    {
+        return _deliverBody;
+    }
+
+    public BasicReturnBody getBounceBody()
+    {
+        return _bounceBody;
+    }
+
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+
+    public ContentHeaderBody getContentHeader()
+    {
+        return _contentHeader;
+    }
+
+    public void setContentHeader(ContentHeaderBody contentHeader)
+    {
+        this._contentHeader = contentHeader;
+    }
+
+    public List<ContentBody> getBodies()
+    {
+        return _bodies;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/message/UnprocessedMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native