You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 16:33:07 UTC

svn commit: r1526190 [3/7] - in /qpid/trunk/qpid/java/amqp-1-0-client-jms: example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/ src/main/java/org/apache/qpid/amqp_1_0/jms/ src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ src/main/java/org/apache...

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java Wed Sep 25 14:33:06 2013
@@ -1,105 +1,105 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
-
-import javax.jms.JMSException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-
-public class ConnectionMetaDataImpl implements ConnectionMetaData
-{
-    private static final int JMS_MAJOR_VERSION = 1;
-    private static final int JMS_MINOR_VERSION = 1;
-
-    private static final int PROVIDER_MAJOR_VERSION = 1;
-    private static final int PROVIDER_MINOR_VERSION = 0;
-
-
-    private final int _amqpMajorVersion;
-    private final int _amqpMinorVersion;
-    private final int _amqpRevisionVersion;
-    private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
-
-    public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
-    {
-        _amqpMajorVersion = amqpMajorVersion;
-        _amqpMinorVersion = amqpMinorVersion;
-        _amqpRevisionVersion = amqpRevisionVersion;
-    }
-
-    public String getJMSVersion() throws JMSException
-    {
-        return getJMSMajorVersion() + "." + getJMSMinorVersion();
-    }
-
-    public int getJMSMajorVersion() throws JMSException
-    {
-        return JMS_MAJOR_VERSION;
-    }
-
-    public int getJMSMinorVersion() throws JMSException
-    {
-        return JMS_MINOR_VERSION;
-    }
-
-    public String getJMSProviderName() throws JMSException
-    {
-        return "AMQP.ORG";
-    }
-
-    public String getProviderVersion() throws JMSException
-    {
-        return getProviderMajorVersion() + "." + getProviderMinorVersion();
-    }
-
-    public int getProviderMajorVersion() throws JMSException
-    {
-        return PROVIDER_MAJOR_VERSION;
-    }
-
-    public int getProviderMinorVersion() throws JMSException
-    {
-        return PROVIDER_MINOR_VERSION;
-    }
-
-    public Enumeration getJMSXPropertyNames() throws JMSException
-    {
-
-        return Collections.enumeration(_jmsxProperties);
-    }
-
-    public int getAMQPMajorVersion()
-    {
-        return _amqpMajorVersion;
-    }
-
-    public int getAMQPMinorVersion()
-    {
-        return _amqpMinorVersion;
-    }
-
-    public int getAMQPRevisionVersion()
-    {
-        return _amqpRevisionVersion;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
+
+import javax.jms.JMSException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+
+public class ConnectionMetaDataImpl implements ConnectionMetaData
+{
+    private static final int JMS_MAJOR_VERSION = 1;
+    private static final int JMS_MINOR_VERSION = 1;
+
+    private static final int PROVIDER_MAJOR_VERSION = 1;
+    private static final int PROVIDER_MINOR_VERSION = 0;
+
+
+    private final int _amqpMajorVersion;
+    private final int _amqpMinorVersion;
+    private final int _amqpRevisionVersion;
+    private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
+
+    public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
+    {
+        _amqpMajorVersion = amqpMajorVersion;
+        _amqpMinorVersion = amqpMinorVersion;
+        _amqpRevisionVersion = amqpRevisionVersion;
+    }
+
+    public String getJMSVersion() throws JMSException
+    {
+        return getJMSMajorVersion() + "." + getJMSMinorVersion();
+    }
+
+    public int getJMSMajorVersion() throws JMSException
+    {
+        return JMS_MAJOR_VERSION;
+    }
+
+    public int getJMSMinorVersion() throws JMSException
+    {
+        return JMS_MINOR_VERSION;
+    }
+
+    public String getJMSProviderName() throws JMSException
+    {
+        return "AMQP.ORG";
+    }
+
+    public String getProviderVersion() throws JMSException
+    {
+        return getProviderMajorVersion() + "." + getProviderMinorVersion();
+    }
+
+    public int getProviderMajorVersion() throws JMSException
+    {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    public int getProviderMinorVersion() throws JMSException
+    {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    public Enumeration getJMSXPropertyNames() throws JMSException
+    {
+
+        return Collections.enumeration(_jmsxProperties);
+    }
+
+    public int getAMQPMajorVersion()
+    {
+        return _amqpMajorVersion;
+    }
+
+    public int getAMQPMinorVersion()
+    {
+        return _amqpMinorVersion;
+    }
+
+    public int getAMQPRevisionVersion()
+    {
+        return _amqpRevisionVersion;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java Wed Sep 25 14:33:06 2013
@@ -1,85 +1,85 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.Destination;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.Topic;
-
-import javax.jms.JMSException;
-import java.util.WeakHashMap;
-
-public class DestinationImpl implements Destination, Queue, Topic
-{
-    private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
-            new WeakHashMap<String, DestinationImpl>();
-
-    private final String _address;
-
-    protected DestinationImpl(String address)
-    {
-        _address = address;
-    }
-
-    public String getAddress()
-    {
-        return _address;
-    }
-
-    public static DestinationImpl valueOf(String address)
-    {
-        return address == null ? null : createDestination(address);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return _address.hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object obj)
-    {
-        return obj != null
-               && obj.getClass() == getClass()
-               && _address.equals(((DestinationImpl)obj)._address);
-    }
-
-    public static synchronized DestinationImpl createDestination(final String address)
-    {
-        DestinationImpl destination = DESTINATION_CACHE.get(address);
-        if(destination == null)
-        {
-            destination = new DestinationImpl(address);
-            DESTINATION_CACHE.put(address, destination);
-        }
-        return destination;
-    }
-
-    public String getQueueName() throws JMSException
-    {
-        return getAddress();
-    }
-
-    public String getTopicName() throws JMSException
-    {
-        return getAddress();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Destination;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+
+import javax.jms.JMSException;
+import java.util.WeakHashMap;
+
+public class DestinationImpl implements Destination, Queue, Topic
+{
+    private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
+            new WeakHashMap<String, DestinationImpl>();
+
+    private final String _address;
+
+    protected DestinationImpl(String address)
+    {
+        _address = address;
+    }
+
+    public String getAddress()
+    {
+        return _address;
+    }
+
+    public static DestinationImpl valueOf(String address)
+    {
+        return address == null ? null : createDestination(address);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _address.hashCode();
+    }
+
+    @Override
+    public boolean equals(final Object obj)
+    {
+        return obj != null
+               && obj.getClass() == getClass()
+               && _address.equals(((DestinationImpl)obj)._address);
+    }
+
+    public static synchronized DestinationImpl createDestination(final String address)
+    {
+        DestinationImpl destination = DESTINATION_CACHE.get(address);
+        if(destination == null)
+        {
+            destination = new DestinationImpl(address);
+            DESTINATION_CACHE.put(address, destination);
+        }
+        return destination;
+    }
+
+    public String getQueueName() throws JMSException
+    {
+        return getAddress();
+    }
+
+    public String getTopicName() throws JMSException
+    {
+        return getAddress();
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java Wed Sep 25 14:33:06 2013
@@ -1,444 +1,444 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.MapMessage;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import java.util.*;
-
-public class MapMessageImpl extends MessageImpl implements MapMessage
-{
-    private Map _map;
-
-    public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
-                          Footer footer,
-                          SessionImpl session)
-    {
-        super(header, messageAnnotations, properties, appProperties, footer, session);
-        _map = map;
-    }
-
-    MapMessageImpl(final SessionImpl session)
-    {
-        super(new Header(), new MessageAnnotations(new HashMap()),
-              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
-              session);
-        _map = new HashMap();
-    }
-
-    public boolean getBoolean(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Boolean)
-        {
-            return ((Boolean) value).booleanValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Boolean.valueOf((String) value);
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to boolean.");
-        }
-    }
-
-    public byte getByte(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Byte)
-        {
-            return ((Byte) value).byteValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Byte.valueOf((String) value).byteValue();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to byte.");
-        }    }
-
-    public short getShort(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Short)
-        {
-            return ((Short) value).shortValue();
-        }
-        else if (value instanceof Byte)
-        {
-            return ((Byte) value).shortValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Short.valueOf((String) value).shortValue();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to short.");
-        }    }
-
-    public char getChar(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (!itemExists(name))
-        {
-            throw new MessageFormatException("Property " + name + " not present");
-        }
-        else if (value instanceof Character)
-        {
-            return ((Character) value).charValue();
-        }
-        else if (value == null)
-        {
-            throw new NullPointerException("Property " + name + " has null value and therefore cannot "
-                + "be converted to char.");
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to boolan.");
-        }    }
-
-    public int getInt(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Integer)
-        {
-            return ((Integer) value).intValue();
-        }
-        else if (value instanceof Short)
-        {
-            return ((Short) value).intValue();
-        }
-        else if (value instanceof Byte)
-        {
-            return ((Byte) value).intValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Integer.valueOf((String) value).intValue();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to int.");
-        }
-    }
-
-    public long getLong(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Long)
-        {
-            return ((Long) value).longValue();
-        }
-        else if (value instanceof Integer)
-        {
-            return ((Integer) value).longValue();
-        }
-
-        if (value instanceof Short)
-        {
-            return ((Short) value).longValue();
-        }
-
-        if (value instanceof Byte)
-        {
-            return ((Byte) value).longValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Long.valueOf((String) value).longValue();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to long.");
-        }
-    }
-
-    public float getFloat(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Float)
-        {
-            return ((Float) value).floatValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Float.valueOf((String) value).floatValue();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to float.");
-        }
-    }
-
-    public double getDouble(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (value instanceof Double)
-        {
-            return ((Double) value).doubleValue();
-        }
-        else if (value instanceof Float)
-        {
-            return ((Float) value).doubleValue();
-        }
-        else if ((value instanceof String) || (value == null))
-        {
-            return Double.valueOf((String) value).doubleValue();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to double.");
-        }
-    }
-
-    public String getString(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if ((value instanceof String) || (value == null))
-        {
-            return (String) value;
-        }
-        else if (value instanceof byte[] || value instanceof Binary)
-        {
-            throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
-        }
-        else
-        {
-            return value.toString();
-        }
-    }
-
-    public byte[] getBytes(String name) throws JMSException
-    {
-        Object value = get(name);
-
-        if (!itemExists(name))
-        {
-            throw new MessageFormatException("Property " + name + " not present");
-        }
-        else if ((value instanceof byte[]) || (value == null))
-        {
-            return (byte[]) value;
-        }
-        else if(value instanceof Binary)
-        {
-            return ((Binary)value).getArray();
-        }
-        else
-        {
-            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
-                + " cannot be converted to byte[].");
-        }    }
-
-    public Object getObject(String s) throws JMSException
-    {
-        Object val = get(s);
-        return val instanceof Binary ? ((Binary)val).getArray() : val;
-    }
-
-    public Enumeration getMapNames() throws JMSException
-    {
-        return Collections.enumeration(keySet());
-    }
-
-    public void setBoolean(String name, boolean val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setByte(String name, byte val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setShort(String name, short val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setChar(String name, char val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setInt(String name, int val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setLong(String name, long val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setFloat(String name, float val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setDouble(String name, double val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setString(String name, String val) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        put(name, val);
-    }
-
-    public void setBytes(String name, byte[] val) throws JMSException
-    {
-        setBytes(name, val, 0, val == null ? 0 : val.length);
-    }
-
-    public void setBytes(String name, byte[] bytes, int offset, int length) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        byte[] val;
-
-        if(bytes == null)
-        {
-            val = null;
-        }
-        else
-        {
-            val = new byte[length];
-            System.arraycopy(bytes,offset,val,0,length);
-        }
-
-        put(name, new Binary(val));
-    }
-
-    public void setObject(String name, Object value) throws JMSException
-    {
-        checkWritable();
-        checkPropertyName(name);
-        if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
-                || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
-                || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
-        {
-            put(name, value);
-        }
-        else
-        {
-            throw new MessageFormatException("Cannot set property " + name + " to value " + value + "of type "
-                + value.getClass().getName() + ".");
-        }    }
-
-    public boolean itemExists(String s)
-    {
-        return _map.containsKey(s);
-    }
-
-    public Object get(final Object key)
-    {
-        return _map.get(key);
-    }
-
-    public Object put(final Object key, final Object val)
-    {
-        return _map.put(key, val);
-    }
-
-    public boolean itemExists(final Object key)
-    {
-        return _map.containsKey(key);
-    }
-
-    public Set<Object> keySet()
-    {
-        return _map.keySet();
-    }
-
-    @Override
-    public void clearBody() throws JMSException
-    {
-        super.clearBody();
-        _map.clear();
-    }
-
-    private void checkPropertyName(String propName)
-    {
-        if ((propName == null) || propName.equals(""))
-        {
-            throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
-        }
-    }
-
-    @Override Collection<Section> getSections()
-    {
-        List<Section> sections = new ArrayList<Section>();
-        sections.add(getHeader());
-        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
-        {
-            sections.add(getMessageAnnotations());
-        }
-        sections.add(getProperties());
-        sections.add(getApplicationProperties());
-        sections.add(new AmqpValue(_map));
-        sections.add(getFooter());
-        return sections;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.MapMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.*;
+
+public class MapMessageImpl extends MessageImpl implements MapMessage
+{
+    private Map _map;
+
+    public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
+                          Footer footer,
+                          SessionImpl session)
+    {
+        super(header, messageAnnotations, properties, appProperties, footer, session);
+        _map = map;
+    }
+
+    MapMessageImpl(final SessionImpl session)
+    {
+        super(new Header(), new MessageAnnotations(new HashMap()),
+              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+              session);
+        _map = new HashMap();
+    }
+
+    public boolean getBoolean(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Boolean)
+        {
+            return ((Boolean) value).booleanValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Boolean.valueOf((String) value);
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to boolean.");
+        }
+    }
+
+    public byte getByte(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Byte)
+        {
+            return ((Byte) value).byteValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Byte.valueOf((String) value).byteValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to byte.");
+        }    }
+
+    public short getShort(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Short)
+        {
+            return ((Short) value).shortValue();
+        }
+        else if (value instanceof Byte)
+        {
+            return ((Byte) value).shortValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Short.valueOf((String) value).shortValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to short.");
+        }    }
+
+    public char getChar(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (!itemExists(name))
+        {
+            throw new MessageFormatException("Property " + name + " not present");
+        }
+        else if (value instanceof Character)
+        {
+            return ((Character) value).charValue();
+        }
+        else if (value == null)
+        {
+            throw new NullPointerException("Property " + name + " has null value and therefore cannot "
+                + "be converted to char.");
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to boolan.");
+        }    }
+
+    public int getInt(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Integer)
+        {
+            return ((Integer) value).intValue();
+        }
+        else if (value instanceof Short)
+        {
+            return ((Short) value).intValue();
+        }
+        else if (value instanceof Byte)
+        {
+            return ((Byte) value).intValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Integer.valueOf((String) value).intValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to int.");
+        }
+    }
+
+    public long getLong(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Long)
+        {
+            return ((Long) value).longValue();
+        }
+        else if (value instanceof Integer)
+        {
+            return ((Integer) value).longValue();
+        }
+
+        if (value instanceof Short)
+        {
+            return ((Short) value).longValue();
+        }
+
+        if (value instanceof Byte)
+        {
+            return ((Byte) value).longValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Long.valueOf((String) value).longValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to long.");
+        }
+    }
+
+    public float getFloat(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Float)
+        {
+            return ((Float) value).floatValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Float.valueOf((String) value).floatValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to float.");
+        }
+    }
+
+    public double getDouble(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (value instanceof Double)
+        {
+            return ((Double) value).doubleValue();
+        }
+        else if (value instanceof Float)
+        {
+            return ((Float) value).doubleValue();
+        }
+        else if ((value instanceof String) || (value == null))
+        {
+            return Double.valueOf((String) value).doubleValue();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to double.");
+        }
+    }
+
+    public String getString(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if ((value instanceof String) || (value == null))
+        {
+            return (String) value;
+        }
+        else if (value instanceof byte[] || value instanceof Binary)
+        {
+            throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
+        }
+        else
+        {
+            return value.toString();
+        }
+    }
+
+    public byte[] getBytes(String name) throws JMSException
+    {
+        Object value = get(name);
+
+        if (!itemExists(name))
+        {
+            throw new MessageFormatException("Property " + name + " not present");
+        }
+        else if ((value instanceof byte[]) || (value == null))
+        {
+            return (byte[]) value;
+        }
+        else if(value instanceof Binary)
+        {
+            return ((Binary)value).getArray();
+        }
+        else
+        {
+            throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+                + " cannot be converted to byte[].");
+        }    }
+
+    public Object getObject(String s) throws JMSException
+    {
+        Object val = get(s);
+        return val instanceof Binary ? ((Binary)val).getArray() : val;
+    }
+
+    public Enumeration getMapNames() throws JMSException
+    {
+        return Collections.enumeration(keySet());
+    }
+
+    public void setBoolean(String name, boolean val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setByte(String name, byte val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setShort(String name, short val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setChar(String name, char val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setInt(String name, int val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setLong(String name, long val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setFloat(String name, float val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setDouble(String name, double val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setString(String name, String val) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        put(name, val);
+    }
+
+    public void setBytes(String name, byte[] val) throws JMSException
+    {
+        setBytes(name, val, 0, val == null ? 0 : val.length);
+    }
+
+    public void setBytes(String name, byte[] bytes, int offset, int length) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        byte[] val;
+
+        if(bytes == null)
+        {
+            val = null;
+        }
+        else
+        {
+            val = new byte[length];
+            System.arraycopy(bytes,offset,val,0,length);
+        }
+
+        put(name, new Binary(val));
+    }
+
+    public void setObject(String name, Object value) throws JMSException
+    {
+        checkWritable();
+        checkPropertyName(name);
+        if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+                || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+                || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
+        {
+            put(name, value);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot set property " + name + " to value " + value + "of type "
+                + value.getClass().getName() + ".");
+        }    }
+
+    public boolean itemExists(String s)
+    {
+        return _map.containsKey(s);
+    }
+
+    public Object get(final Object key)
+    {
+        return _map.get(key);
+    }
+
+    public Object put(final Object key, final Object val)
+    {
+        return _map.put(key, val);
+    }
+
+    public boolean itemExists(final Object key)
+    {
+        return _map.containsKey(key);
+    }
+
+    public Set<Object> keySet()
+    {
+        return _map.keySet();
+    }
+
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _map.clear();
+    }
+
+    private void checkPropertyName(String propName)
+    {
+        if ((propName == null) || propName.equals(""))
+        {
+            throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+        }
+    }
+
+    @Override Collection<Section> getSections()
+    {
+        List<Section> sections = new ArrayList<Section>();
+        sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
+        sections.add(getProperties());
+        sections.add(getApplicationProperties());
+        sections.add(new AmqpValue(_map));
+        sections.add(getFooter());
+        return sections;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Wed Sep 25 14:33:06 2013
@@ -1,485 +1,485 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.client.Transaction;
-import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-import org.apache.qpid.amqp_1_0.jms.Session;
-import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
-import org.apache.qpid.amqp_1_0.jms.Topic;
-import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
-{
-    private static final Symbol NO_LOCAL_FILTER_NAME = Symbol.valueOf("no-local");
-    private static final Symbol JMS_SELECTOR_FILTER_NAME = Symbol.valueOf("jms-selector");
-    private String _selector;
-    private boolean _noLocal;
-    private DestinationImpl _destination;
-    private SessionImpl _session;
-    private Receiver _receiver;
-    private Binary _lastUnackedMessage;
-    MessageListener _messageListener;
-
-    private boolean _isQueueConsumer;
-    private boolean _isTopicSubscriber;
-
-    private boolean _closed = false;
-    private String _linkName;
-    private boolean _durable;
-    private Collection<Binary> _txnMsgs = Collections.synchronizedCollection(new ArrayList<Binary>());
-    private Binary _lastTxnUpdate;
-    private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
-    private final List<Message> _replaymessages = new ArrayList<Message>();
-
-    MessageConsumerImpl(final Destination destination,
-                        final SessionImpl session,
-                        final String selector,
-                        final boolean noLocal) throws JMSException
-    {
-        this(destination,session,selector,noLocal,null,false);
-    }
-
-    MessageConsumerImpl(final Destination destination,
-                        final SessionImpl session,
-                        final String selector,
-                        final boolean noLocal,
-                        final String linkName,
-                        final boolean durable) throws JMSException
-    {
-        _selector = selector;
-        _noLocal = noLocal;
-        _linkName = linkName;
-        _durable = durable;
-        if(destination instanceof DestinationImpl)
-        {
-            _destination = (DestinationImpl) destination;
-            if(destination instanceof javax.jms.Queue)
-            {
-                _isQueueConsumer = true;
-            }
-            else if(destination instanceof javax.jms.Topic)
-            {
-                _isTopicSubscriber = true;
-            }
-            if(destination instanceof TemporaryDestination)
-            {
-                ((TemporaryDestination)destination).addConsumer(this);
-            }
-        }
-        else
-        {
-            throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
-        }
-        _session = session;
-
-        _receiver = createClientReceiver();
-        _receiver.setRemoteErrorListener(new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                try
-                {
-                    final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
-
-                    if(exceptionListener != null)
-                    {
-                        final Error receiverError = _receiver.getError();
-                        exceptionListener.onException(new JMSException(receiverError.getDescription(),
-                                receiverError.getCondition().getValue().toString()));
-
-                    }
-                }
-                catch (JMSException e)
-                {
-
-                }
-            }
-        });
-
-
-    }
-
-    protected Receiver createClientReceiver() throws JMSException
-    {
-        try
-        {
-            return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
-                    _linkName, _durable, getFilters(), null);
-        }
-        catch (ConnectionErrorException e)
-        {
-            Error error = e.getRemoteError();
-            if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
-            {
-                throw new InvalidSelectorException(e.getMessage());
-            }
-            else
-            {
-                throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
-
-            }
-        }
-    }
-
-    Map<Symbol, Filter> getFilters()
-    {
-        if(_selector == null || _selector.trim().equals(""))
-        {
-            if(_noLocal)
-            {
-                return Collections.singletonMap(NO_LOCAL_FILTER_NAME, (Filter) NoLocalFilter.INSTANCE);
-            }
-            else
-            {
-                return null;
-
-            }
-        }
-        else if(_noLocal)
-        {
-            Map<Symbol, Filter> filters = new HashMap<Symbol, Filter>();
-            filters.put(NO_LOCAL_FILTER_NAME, NoLocalFilter.INSTANCE);
-            filters.put(JMS_SELECTOR_FILTER_NAME, new JMSSelectorFilter(_selector));
-            return filters;
-        }
-        else
-        {
-            return Collections.singletonMap(JMS_SELECTOR_FILTER_NAME, (Filter)new JMSSelectorFilter(_selector));
-        }
-
-
-    }
-
-    public String getMessageSelector() throws JMSException
-    {
-        checkClosed();
-        return _selector;
-    }
-
-    public MessageListener getMessageListener() throws IllegalStateException
-    {
-        checkClosed();
-        return _messageListener;
-    }
-
-    public void setMessageListener(final MessageListener messageListener) throws JMSException
-    {
-        checkClosed();
-        _messageListener = messageListener;
-        _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
-                {
-
-                    public void messageArrived(final Receiver receiver)
-                    {
-                        _session.messageArrived(MessageConsumerImpl.this);
-                    }
-                });
-        _session.messageListenerSet( this );
-
-    }
-
-    public MessageImpl receive() throws JMSException
-    {
-        checkClosed();
-        return receiveImpl(-1L);
-    }
-
-    public MessageImpl receive(final long timeout) throws JMSException
-    {
-        checkClosed();
-        // TODO - validate timeout > 0
-
-        return receiveImpl(timeout);
-    }
-
-    public MessageImpl receiveNoWait() throws JMSException
-    {
-        checkClosed();
-        return receiveImpl(0L);
-    }
-
-    private MessageImpl receiveImpl(long timeout) throws JMSException
-    {
-
-        org.apache.qpid.amqp_1_0.client.Message msg;
-        boolean redelivery;
-        if(_replaymessages.isEmpty())
-        {
-            checkReceiverError();
-            msg = receive0(timeout);
-            redelivery = false;
-        }
-        else
-        {
-            msg = _replaymessages.remove(0);
-            redelivery = true;
-        }
-
-        if(msg != null)
-        {
-            preReceiveAction(msg);
-        }
-        return createJMSMessage(msg, redelivery);
-    }
-
-    void checkReceiverError() throws JMSException
-    {
-        final Error receiverError = _receiver.getError();
-        if(receiverError != null)
-        {
-            JMSException jmsException =
-                    new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
-
-            throw jmsException;
-        }
-    }
-
-    Message receive0(final long timeout)
-    {
-
-        Message message = _receiver.receive(timeout);
-        if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
-        {
-            _recoverReplayMessages.add(message);
-        }
-        return message;
-    }
-
-
-    void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
-    {
-        _receiver.acknowledge(msg.getDeliveryTag(), _session.getTxn());
-    }
-
-    MessageImpl createJMSMessage(final Message msg, boolean redelivery)
-    {
-        if(msg != null)
-        {
-            MessageFactory factory = _session.getMessageFactory();
-            final MessageImpl message = factory.createMessage(_destination, msg);
-            message.setFromQueue(_isQueueConsumer);
-            message.setFromTopic(_isTopicSubscriber);
-            if(redelivery)
-            {
-                if(!message.getJMSRedelivered())
-                {
-                    message.setJMSRedelivered(true);
-                }
-            }
-
-            return message;
-        }
-        else
-        {
-            return null;
-        }
-    }
-
-    public void close() throws JMSException
-    {
-        if(!_closed)
-        {
-            _closed = true;
-
-            closeUnderlyingReceiver(_receiver);
-
-            if(_destination instanceof TemporaryDestination)
-            {
-                ((TemporaryDestination)_destination).removeConsumer(this);
-            }
-        }
-    }
-
-    protected void closeUnderlyingReceiver(Receiver receiver)
-    {
-        receiver.close();
-    }
-
-    private void checkClosed() throws IllegalStateException
-    {
-        if(_closed)
-        {
-            throw new javax.jms.IllegalStateException("Closed");
-        }
-    }
-
-    void setLastUnackedMessage(final Binary deliveryTag)
-    {
-        _lastUnackedMessage = deliveryTag;
-    }
-
-    void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
-    {
-        int acknowledgeMode = _session.getAckModeEnum().ordinal();
-
-        if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
-           || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
-           || acknowledgeMode == Session.SESSION_TRANSACTED)
-        {
-            acknowledge(msg);
-            if(acknowledgeMode == Session.SESSION_TRANSACTED)
-            {
-                _txnMsgs.add(msg.getDeliveryTag());
-            }
-        }
-        else if(acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
-        {
-            setLastUnackedMessage(msg.getDeliveryTag());
-        }
-    }
-
-    void acknowledgeAll()
-    {
-        if(_lastUnackedMessage != null)
-        {
-            Transaction txn = _session.getTxn();
-            _receiver.acknowledgeAll(_lastUnackedMessage, txn, null);
-            if(txn != null)
-            {
-                _lastTxnUpdate = _lastUnackedMessage;
-            }
-            _lastUnackedMessage = null;
-
-        }
-        _recoverReplayMessages.clear();
-        if(!_replaymessages.isEmpty())
-        {
-            _recoverReplayMessages.addAll(_replaymessages);
-        }
-    }
-
-    void postRollback()
-    {
-        if(_lastTxnUpdate != null)
-        {
-            final Modified outcome = new Modified();
-            outcome.setDeliveryFailed(true);
-            _receiver.updateAll(outcome, _lastTxnUpdate);
-            _lastTxnUpdate = null;
-        }
-        for(Binary tag : _txnMsgs)
-        {
-            _receiver.modified(tag);
-        }
-        _txnMsgs.clear();
-    }
-
-    void postCommit()
-    {
-        _lastTxnUpdate = null;
-        _txnMsgs.clear();
-    }
-
-    public DestinationImpl getDestination() throws IllegalStateException
-    {
-        checkClosed();
-        return _destination;
-    }
-
-
-    public SessionImpl getSession() throws IllegalStateException
-    {
-        checkClosed();
-        return _session;
-    }
-
-    public boolean getNoLocal() throws IllegalStateException
-    {
-        checkClosed();
-        return _noLocal;
-    }
-
-    public void start()
-    {
-        _receiver.setCredit(UnsignedInteger.valueOf(100), true);
-    }
-
-    public Queue getQueue() throws JMSException
-    {
-        return (Queue) getDestination();
-    }
-
-    public Topic getTopic() throws JMSException
-    {
-        return (Topic) getDestination();
-    }
-
-    void setQueueConsumer(final boolean queueConsumer)
-    {
-        _isQueueConsumer = queueConsumer;
-    }
-
-    void setTopicSubscriber(final boolean topicSubscriber)
-    {
-        _isTopicSubscriber = topicSubscriber;
-    }
-
-    String getLinkName()
-    {
-        return _linkName;
-    }
-
-    boolean isDurable()
-    {
-        return _durable;
-    }
-
-    void doRecover()
-    {
-        _replaymessages.clear();
-        if(!_recoverReplayMessages.isEmpty())
-        {
-            _replaymessages.addAll(_recoverReplayMessages);
-            for(Message msg : _replaymessages)
-            {
-                _session.messageArrived(this);
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Transaction;
+import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
+{
+    private static final Symbol NO_LOCAL_FILTER_NAME = Symbol.valueOf("no-local");
+    private static final Symbol JMS_SELECTOR_FILTER_NAME = Symbol.valueOf("jms-selector");
+    private String _selector;
+    private boolean _noLocal;
+    private DestinationImpl _destination;
+    private SessionImpl _session;
+    private Receiver _receiver;
+    private Binary _lastUnackedMessage;
+    MessageListener _messageListener;
+
+    private boolean _isQueueConsumer;
+    private boolean _isTopicSubscriber;
+
+    private boolean _closed = false;
+    private String _linkName;
+    private boolean _durable;
+    private Collection<Binary> _txnMsgs = Collections.synchronizedCollection(new ArrayList<Binary>());
+    private Binary _lastTxnUpdate;
+    private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
+    private final List<Message> _replaymessages = new ArrayList<Message>();
+
+    MessageConsumerImpl(final Destination destination,
+                        final SessionImpl session,
+                        final String selector,
+                        final boolean noLocal) throws JMSException
+    {
+        this(destination,session,selector,noLocal,null,false);
+    }
+
+    MessageConsumerImpl(final Destination destination,
+                        final SessionImpl session,
+                        final String selector,
+                        final boolean noLocal,
+                        final String linkName,
+                        final boolean durable) throws JMSException
+    {
+        _selector = selector;
+        _noLocal = noLocal;
+        _linkName = linkName;
+        _durable = durable;
+        if(destination instanceof DestinationImpl)
+        {
+            _destination = (DestinationImpl) destination;
+            if(destination instanceof javax.jms.Queue)
+            {
+                _isQueueConsumer = true;
+            }
+            else if(destination instanceof javax.jms.Topic)
+            {
+                _isTopicSubscriber = true;
+            }
+            if(destination instanceof TemporaryDestination)
+            {
+                ((TemporaryDestination)destination).addConsumer(this);
+            }
+        }
+        else
+        {
+            throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
+        }
+        _session = session;
+
+        _receiver = createClientReceiver();
+        _receiver.setRemoteErrorListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
+                {
+                    final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+                    if(exceptionListener != null)
+                    {
+                        final Error receiverError = _receiver.getError();
+                        exceptionListener.onException(new JMSException(receiverError.getDescription(),
+                                receiverError.getCondition().getValue().toString()));
+
+                    }
+                }
+                catch (JMSException e)
+                {
+
+                }
+            }
+        });
+
+
+    }
+
+    protected Receiver createClientReceiver() throws JMSException
+    {
+        try
+        {
+            return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
+                    _linkName, _durable, getFilters(), null);
+        }
+        catch (ConnectionErrorException e)
+        {
+            Error error = e.getRemoteError();
+            if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+            {
+                throw new InvalidSelectorException(e.getMessage());
+            }
+            else
+            {
+                throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+
+            }
+        }
+    }
+
+    Map<Symbol, Filter> getFilters()
+    {
+        if(_selector == null || _selector.trim().equals(""))
+        {
+            if(_noLocal)
+            {
+                return Collections.singletonMap(NO_LOCAL_FILTER_NAME, (Filter) NoLocalFilter.INSTANCE);
+            }
+            else
+            {
+                return null;
+
+            }
+        }
+        else if(_noLocal)
+        {
+            Map<Symbol, Filter> filters = new HashMap<Symbol, Filter>();
+            filters.put(NO_LOCAL_FILTER_NAME, NoLocalFilter.INSTANCE);
+            filters.put(JMS_SELECTOR_FILTER_NAME, new JMSSelectorFilter(_selector));
+            return filters;
+        }
+        else
+        {
+            return Collections.singletonMap(JMS_SELECTOR_FILTER_NAME, (Filter)new JMSSelectorFilter(_selector));
+        }
+
+
+    }
+
+    public String getMessageSelector() throws JMSException
+    {
+        checkClosed();
+        return _selector;
+    }
+
+    public MessageListener getMessageListener() throws IllegalStateException
+    {
+        checkClosed();
+        return _messageListener;
+    }
+
+    public void setMessageListener(final MessageListener messageListener) throws JMSException
+    {
+        checkClosed();
+        _messageListener = messageListener;
+        _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
+                {
+
+                    public void messageArrived(final Receiver receiver)
+                    {
+                        _session.messageArrived(MessageConsumerImpl.this);
+                    }
+                });
+        _session.messageListenerSet( this );
+
+    }
+
+    public MessageImpl receive() throws JMSException
+    {
+        checkClosed();
+        return receiveImpl(-1L);
+    }
+
+    public MessageImpl receive(final long timeout) throws JMSException
+    {
+        checkClosed();
+        // TODO - validate timeout > 0
+
+        return receiveImpl(timeout);
+    }
+
+    public MessageImpl receiveNoWait() throws JMSException
+    {
+        checkClosed();
+        return receiveImpl(0L);
+    }
+
+    private MessageImpl receiveImpl(long timeout) throws JMSException
+    {
+
+        org.apache.qpid.amqp_1_0.client.Message msg;
+        boolean redelivery;
+        if(_replaymessages.isEmpty())
+        {
+            checkReceiverError();
+            msg = receive0(timeout);
+            redelivery = false;
+        }
+        else
+        {
+            msg = _replaymessages.remove(0);
+            redelivery = true;
+        }
+
+        if(msg != null)
+        {
+            preReceiveAction(msg);
+        }
+        return createJMSMessage(msg, redelivery);
+    }
+
+    void checkReceiverError() throws JMSException
+    {
+        final Error receiverError = _receiver.getError();
+        if(receiverError != null)
+        {
+            JMSException jmsException =
+                    new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+            throw jmsException;
+        }
+    }
+
+    Message receive0(final long timeout)
+    {
+
+        Message message = _receiver.receive(timeout);
+        if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+        {
+            _recoverReplayMessages.add(message);
+        }
+        return message;
+    }
+
+
+    void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
+    {
+        _receiver.acknowledge(msg.getDeliveryTag(), _session.getTxn());
+    }
+
+    MessageImpl createJMSMessage(final Message msg, boolean redelivery)
+    {
+        if(msg != null)
+        {
+            MessageFactory factory = _session.getMessageFactory();
+            final MessageImpl message = factory.createMessage(_destination, msg);
+            message.setFromQueue(_isQueueConsumer);
+            message.setFromTopic(_isTopicSubscriber);
+            if(redelivery)
+            {
+                if(!message.getJMSRedelivered())
+                {
+                    message.setJMSRedelivered(true);
+                }
+            }
+
+            return message;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    public void close() throws JMSException
+    {
+        if(!_closed)
+        {
+            _closed = true;
+
+            closeUnderlyingReceiver(_receiver);
+
+            if(_destination instanceof TemporaryDestination)
+            {
+                ((TemporaryDestination)_destination).removeConsumer(this);
+            }
+        }
+    }
+
+    protected void closeUnderlyingReceiver(Receiver receiver)
+    {
+        receiver.close();
+    }
+
+    private void checkClosed() throws IllegalStateException
+    {
+        if(_closed)
+        {
+            throw new javax.jms.IllegalStateException("Closed");
+        }
+    }
+
+    void setLastUnackedMessage(final Binary deliveryTag)
+    {
+        _lastUnackedMessage = deliveryTag;
+    }
+
+    void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
+    {
+        int acknowledgeMode = _session.getAckModeEnum().ordinal();
+
+        if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
+           || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
+           || acknowledgeMode == Session.SESSION_TRANSACTED)
+        {
+            acknowledge(msg);
+            if(acknowledgeMode == Session.SESSION_TRANSACTED)
+            {
+                _txnMsgs.add(msg.getDeliveryTag());
+            }
+        }
+        else if(acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+        {
+            setLastUnackedMessage(msg.getDeliveryTag());
+        }
+    }
+
+    void acknowledgeAll()
+    {
+        if(_lastUnackedMessage != null)
+        {
+            Transaction txn = _session.getTxn();
+            _receiver.acknowledgeAll(_lastUnackedMessage, txn, null);
+            if(txn != null)
+            {
+                _lastTxnUpdate = _lastUnackedMessage;
+            }
+            _lastUnackedMessage = null;
+
+        }
+        _recoverReplayMessages.clear();
+        if(!_replaymessages.isEmpty())
+        {
+            _recoverReplayMessages.addAll(_replaymessages);
+        }
+    }
+
+    void postRollback()
+    {
+        if(_lastTxnUpdate != null)
+        {
+            final Modified outcome = new Modified();
+            outcome.setDeliveryFailed(true);
+            _receiver.updateAll(outcome, _lastTxnUpdate);
+            _lastTxnUpdate = null;
+        }
+        for(Binary tag : _txnMsgs)
+        {
+            _receiver.modified(tag);
+        }
+        _txnMsgs.clear();
+    }
+
+    void postCommit()
+    {
+        _lastTxnUpdate = null;
+        _txnMsgs.clear();
+    }
+
+    public DestinationImpl getDestination() throws IllegalStateException
+    {
+        checkClosed();
+        return _destination;
+    }
+
+
+    public SessionImpl getSession() throws IllegalStateException
+    {
+        checkClosed();
+        return _session;
+    }
+
+    public boolean getNoLocal() throws IllegalStateException
+    {
+        checkClosed();
+        return _noLocal;
+    }
+
+    public void start()
+    {
+        _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+    }
+
+    public Queue getQueue() throws JMSException
+    {
+        return (Queue) getDestination();
+    }
+
+    public Topic getTopic() throws JMSException
+    {
+        return (Topic) getDestination();
+    }
+
+    void setQueueConsumer(final boolean queueConsumer)
+    {
+        _isQueueConsumer = queueConsumer;
+    }
+
+    void setTopicSubscriber(final boolean topicSubscriber)
+    {
+        _isTopicSubscriber = topicSubscriber;
+    }
+
+    String getLinkName()
+    {
+        return _linkName;
+    }
+
+    boolean isDurable()
+    {
+        return _durable;
+    }
+
+    void doRecover()
+    {
+        _replaymessages.clear();
+        if(!_recoverReplayMessages.isEmpty())
+        {
+            _replaymessages.addAll(_recoverReplayMessages);
+            for(Message msg : _replaymessages)
+            {
+                _session.messageArrived(this);
+            }
+        }
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Wed Sep 25 14:33:06 2013
@@ -1,191 +1,191 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.util.*;
-
-class MessageFactory
-{
-    private final SessionImpl _session;
-
-
-    MessageFactory(final SessionImpl session)
-    {
-        _session = session;
-    }
-
-    public MessageImpl createMessage(final DestinationImpl destination, final Message msg)
-    {
-        MessageImpl message;
-        List<Section> payload = msg.getPayload();
-        Header header = null;
-        MessageAnnotations messageAnnotations = null;
-
-        Properties properties = null;
-        ApplicationProperties appProperties = null;
-        Footer footer;
-
-        Iterator<Section> iter = payload.iterator();
-        List<Section> body = new ArrayList<Section>();
-
-        Section section = iter.hasNext() ? iter.next() : null;
-
-        if(section instanceof Header)
-        {
-            header = (Header) section;
-            section = iter.hasNext() ? iter.next() : null;
-        }
-
-        if(section instanceof MessageAnnotations)
-        {
-            messageAnnotations = (MessageAnnotations) section;
-            section = iter.hasNext() ? iter.next() : null;
-        }
-
-        if(section instanceof Properties)
-        {
-            properties = (Properties) section;
-            section = iter.hasNext() ? iter.next() : null;
-        }
-
-        if(section instanceof ApplicationProperties)
-        {
-            appProperties = (ApplicationProperties) section;
-            section = iter.hasNext() ? iter.next() : null;
-        }
-
-        while(section != null && !(section instanceof Footer))
-        {
-            body.add(section);
-            section = iter.hasNext() ? iter.next() : null;
-        }
-
-        footer = (Footer) section;
-
-        if(body.size() == 1)
-        {
-            Section bodySection = body.get(0);
-            if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
-            {
-                message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
-            }
-            else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
-            {
-                message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
-                                                (List) ((AmqpValue)bodySection).getValue(), footer, _session);
-            }
-            else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
-            {
-                message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
-                                                (String) ((AmqpValue)bodySection).getValue(), footer, _session);
-            }
-            else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Binary)
-            {
-
-                Binary value = (Binary) ((AmqpValue) bodySection).getValue();
-                message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties,
-                                               new Data(value), footer, _session);
-            }
-            else if(bodySection instanceof Data)
-            {
-                if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
-                {
-
-
-                    message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties,
-                                                    (Data) bodySection,
-                                                    footer,
-                                                    _session);
-                }
-                else
-                {
-                    message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
-                }
-            }
-            else if(bodySection instanceof AmqpSequence)
-            {
-                message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
-            }
-
-            /*else if(bodySection instanceof AmqpDataSection)
-            {
-                AmqpDataSection dataSection = (AmqpDataSection) bodySection;
-
-                List<Object> data = new ArrayList<Object>();
-
-                ListIterator<Object> dataIter = dataSection.iterator();
-
-                while(dataIter.hasNext())
-                {
-                    data.add(dataIter.next());
-                }
-
-                if(data.size() == 1)
-                {
-                    final Object obj = data.get(0);
-                    if( obj instanceof String)
-                    {
-                        message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
-                    }
-                    else if(obj instanceof JavaSerializable)
-                    {
-                        // TODO - ObjectMessage
-                        message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
-                    }
-                    else if(obj instanceof Serializable)
-                    {
-                        message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
-                    }
-                    else
-                    {
-                        message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
-                    }
-                }
-                else
-                {
-                    // not a text message
-                    message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
-                }
-            }*/
-            else
-            {
-                message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
-            }
-        }
-        else
-        {
-            message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
-        }
-
-        message.setReadOnly();
-
-        return message;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.*;
+
+class MessageFactory
+{
+    private final SessionImpl _session;
+
+
+    MessageFactory(final SessionImpl session)
+    {
+        _session = session;
+    }
+
+    public MessageImpl createMessage(final DestinationImpl destination, final Message msg)
+    {
+        MessageImpl message;
+        List<Section> payload = msg.getPayload();
+        Header header = null;
+        MessageAnnotations messageAnnotations = null;
+
+        Properties properties = null;
+        ApplicationProperties appProperties = null;
+        Footer footer;
+
+        Iterator<Section> iter = payload.iterator();
+        List<Section> body = new ArrayList<Section>();
+
+        Section section = iter.hasNext() ? iter.next() : null;
+
+        if(section instanceof Header)
+        {
+            header = (Header) section;
+            section = iter.hasNext() ? iter.next() : null;
+        }
+
+        if(section instanceof MessageAnnotations)
+        {
+            messageAnnotations = (MessageAnnotations) section;
+            section = iter.hasNext() ? iter.next() : null;
+        }
+
+        if(section instanceof Properties)
+        {
+            properties = (Properties) section;
+            section = iter.hasNext() ? iter.next() : null;
+        }
+
+        if(section instanceof ApplicationProperties)
+        {
+            appProperties = (ApplicationProperties) section;
+            section = iter.hasNext() ? iter.next() : null;
+        }
+
+        while(section != null && !(section instanceof Footer))
+        {
+            body.add(section);
+            section = iter.hasNext() ? iter.next() : null;
+        }
+
+        footer = (Footer) section;
+
+        if(body.size() == 1)
+        {
+            Section bodySection = body.get(0);
+            if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
+            {
+                message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
+            }
+            else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
+            {
+                message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
+                                                (List) ((AmqpValue)bodySection).getValue(), footer, _session);
+            }
+            else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
+            {
+                message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
+                                                (String) ((AmqpValue)bodySection).getValue(), footer, _session);
+            }
+            else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Binary)
+            {
+
+                Binary value = (Binary) ((AmqpValue) bodySection).getValue();
+                message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties,
+                                               new Data(value), footer, _session);
+            }
+            else if(bodySection instanceof Data)
+            {
+                if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+                {
+
+
+                    message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties,
+                                                    (Data) bodySection,
+                                                    footer,
+                                                    _session);
+                }
+                else
+                {
+                    message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
+                }
+            }
+            else if(bodySection instanceof AmqpSequence)
+            {
+                message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+            }
+
+            /*else if(bodySection instanceof AmqpDataSection)
+            {
+                AmqpDataSection dataSection = (AmqpDataSection) bodySection;
+
+                List<Object> data = new ArrayList<Object>();
+
+                ListIterator<Object> dataIter = dataSection.iterator();
+
+                while(dataIter.hasNext())
+                {
+                    data.add(dataIter.next());
+                }
+
+                if(data.size() == 1)
+                {
+                    final Object obj = data.get(0);
+                    if( obj instanceof String)
+                    {
+                        message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
+                    }
+                    else if(obj instanceof JavaSerializable)
+                    {
+                        // TODO - ObjectMessage
+                        message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                    }
+                    else if(obj instanceof Serializable)
+                    {
+                        message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
+                    }
+                    else
+                    {
+                        message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                    }
+                }
+                else
+                {
+                    // not a text message
+                    message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                }
+            }*/
+            else
+            {
+                message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
+            }
+        }
+        else
+        {
+            message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
+        }
+
+        message.setReadOnly();
+
+        return message;
+    }
+}

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org