You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/25 16:33:07 UTC
svn commit: r1526190 [3/7] - in /qpid/trunk/qpid/java/amqp-1-0-client-jms:
example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/
src/main/java/org/apache/qpid/amqp_1_0/jms/
src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ src/main/java/org/apache...
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java Wed Sep 25 14:33:06 2013
@@ -1,105 +1,105 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
-
-import javax.jms.JMSException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-
-public class ConnectionMetaDataImpl implements ConnectionMetaData
-{
- private static final int JMS_MAJOR_VERSION = 1;
- private static final int JMS_MINOR_VERSION = 1;
-
- private static final int PROVIDER_MAJOR_VERSION = 1;
- private static final int PROVIDER_MINOR_VERSION = 0;
-
-
- private final int _amqpMajorVersion;
- private final int _amqpMinorVersion;
- private final int _amqpRevisionVersion;
- private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
-
- public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
- {
- _amqpMajorVersion = amqpMajorVersion;
- _amqpMinorVersion = amqpMinorVersion;
- _amqpRevisionVersion = amqpRevisionVersion;
- }
-
- public String getJMSVersion() throws JMSException
- {
- return getJMSMajorVersion() + "." + getJMSMinorVersion();
- }
-
- public int getJMSMajorVersion() throws JMSException
- {
- return JMS_MAJOR_VERSION;
- }
-
- public int getJMSMinorVersion() throws JMSException
- {
- return JMS_MINOR_VERSION;
- }
-
- public String getJMSProviderName() throws JMSException
- {
- return "AMQP.ORG";
- }
-
- public String getProviderVersion() throws JMSException
- {
- return getProviderMajorVersion() + "." + getProviderMinorVersion();
- }
-
- public int getProviderMajorVersion() throws JMSException
- {
- return PROVIDER_MAJOR_VERSION;
- }
-
- public int getProviderMinorVersion() throws JMSException
- {
- return PROVIDER_MINOR_VERSION;
- }
-
- public Enumeration getJMSXPropertyNames() throws JMSException
- {
-
- return Collections.enumeration(_jmsxProperties);
- }
-
- public int getAMQPMajorVersion()
- {
- return _amqpMajorVersion;
- }
-
- public int getAMQPMinorVersion()
- {
- return _amqpMinorVersion;
- }
-
- public int getAMQPRevisionVersion()
- {
- return _amqpRevisionVersion;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
+
+import javax.jms.JMSException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+
+public class ConnectionMetaDataImpl implements ConnectionMetaData
+{
+ private static final int JMS_MAJOR_VERSION = 1;
+ private static final int JMS_MINOR_VERSION = 1;
+
+ private static final int PROVIDER_MAJOR_VERSION = 1;
+ private static final int PROVIDER_MINOR_VERSION = 0;
+
+
+ private final int _amqpMajorVersion;
+ private final int _amqpMinorVersion;
+ private final int _amqpRevisionVersion;
+ private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
+
+ public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
+ {
+ _amqpMajorVersion = amqpMajorVersion;
+ _amqpMinorVersion = amqpMinorVersion;
+ _amqpRevisionVersion = amqpRevisionVersion;
+ }
+
+ public String getJMSVersion() throws JMSException
+ {
+ return getJMSMajorVersion() + "." + getJMSMinorVersion();
+ }
+
+ public int getJMSMajorVersion() throws JMSException
+ {
+ return JMS_MAJOR_VERSION;
+ }
+
+ public int getJMSMinorVersion() throws JMSException
+ {
+ return JMS_MINOR_VERSION;
+ }
+
+ public String getJMSProviderName() throws JMSException
+ {
+ return "AMQP.ORG";
+ }
+
+ public String getProviderVersion() throws JMSException
+ {
+ return getProviderMajorVersion() + "." + getProviderMinorVersion();
+ }
+
+ public int getProviderMajorVersion() throws JMSException
+ {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ public int getProviderMinorVersion() throws JMSException
+ {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ public Enumeration getJMSXPropertyNames() throws JMSException
+ {
+
+ return Collections.enumeration(_jmsxProperties);
+ }
+
+ public int getAMQPMajorVersion()
+ {
+ return _amqpMajorVersion;
+ }
+
+ public int getAMQPMinorVersion()
+ {
+ return _amqpMinorVersion;
+ }
+
+ public int getAMQPRevisionVersion()
+ {
+ return _amqpRevisionVersion;
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DecodedDestination.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java Wed Sep 25 14:33:06 2013
@@ -1,85 +1,85 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.Destination;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.Topic;
-
-import javax.jms.JMSException;
-import java.util.WeakHashMap;
-
-public class DestinationImpl implements Destination, Queue, Topic
-{
- private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
- new WeakHashMap<String, DestinationImpl>();
-
- private final String _address;
-
- protected DestinationImpl(String address)
- {
- _address = address;
- }
-
- public String getAddress()
- {
- return _address;
- }
-
- public static DestinationImpl valueOf(String address)
- {
- return address == null ? null : createDestination(address);
- }
-
- @Override
- public int hashCode()
- {
- return _address.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj)
- {
- return obj != null
- && obj.getClass() == getClass()
- && _address.equals(((DestinationImpl)obj)._address);
- }
-
- public static synchronized DestinationImpl createDestination(final String address)
- {
- DestinationImpl destination = DESTINATION_CACHE.get(address);
- if(destination == null)
- {
- destination = new DestinationImpl(address);
- DESTINATION_CACHE.put(address, destination);
- }
- return destination;
- }
-
- public String getQueueName() throws JMSException
- {
- return getAddress();
- }
-
- public String getTopicName() throws JMSException
- {
- return getAddress();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.Destination;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+
+import javax.jms.JMSException;
+import java.util.WeakHashMap;
+
+public class DestinationImpl implements Destination, Queue, Topic
+{
+ private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
+ new WeakHashMap<String, DestinationImpl>();
+
+ private final String _address;
+
+ protected DestinationImpl(String address)
+ {
+ _address = address;
+ }
+
+ public String getAddress()
+ {
+ return _address;
+ }
+
+ public static DestinationImpl valueOf(String address)
+ {
+ return address == null ? null : createDestination(address);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _address.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj)
+ {
+ return obj != null
+ && obj.getClass() == getClass()
+ && _address.equals(((DestinationImpl)obj)._address);
+ }
+
+ public static synchronized DestinationImpl createDestination(final String address)
+ {
+ DestinationImpl destination = DESTINATION_CACHE.get(address);
+ if(destination == null)
+ {
+ destination = new DestinationImpl(address);
+ DESTINATION_CACHE.put(address, destination);
+ }
+ return destination;
+ }
+
+ public String getQueueName() throws JMSException
+ {
+ return getAddress();
+ }
+
+ public String getTopicName() throws JMSException
+ {
+ return getAddress();
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java Wed Sep 25 14:33:06 2013
@@ -1,444 +1,444 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.jms.MapMessage;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.MessageFormatException;
-import java.util.*;
-
-public class MapMessageImpl extends MessageImpl implements MapMessage
-{
- private Map _map;
-
- public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
- Footer footer,
- SessionImpl session)
- {
- super(header, messageAnnotations, properties, appProperties, footer, session);
- _map = map;
- }
-
- MapMessageImpl(final SessionImpl session)
- {
- super(new Header(), new MessageAnnotations(new HashMap()),
- new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
- session);
- _map = new HashMap();
- }
-
- public boolean getBoolean(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Boolean)
- {
- return ((Boolean) value).booleanValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Boolean.valueOf((String) value);
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to boolean.");
- }
- }
-
- public byte getByte(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Byte)
- {
- return ((Byte) value).byteValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Byte.valueOf((String) value).byteValue();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to byte.");
- } }
-
- public short getShort(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Short)
- {
- return ((Short) value).shortValue();
- }
- else if (value instanceof Byte)
- {
- return ((Byte) value).shortValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Short.valueOf((String) value).shortValue();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to short.");
- } }
-
- public char getChar(String name) throws JMSException
- {
- Object value = get(name);
-
- if (!itemExists(name))
- {
- throw new MessageFormatException("Property " + name + " not present");
- }
- else if (value instanceof Character)
- {
- return ((Character) value).charValue();
- }
- else if (value == null)
- {
- throw new NullPointerException("Property " + name + " has null value and therefore cannot "
- + "be converted to char.");
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to boolan.");
- } }
-
- public int getInt(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Integer)
- {
- return ((Integer) value).intValue();
- }
- else if (value instanceof Short)
- {
- return ((Short) value).intValue();
- }
- else if (value instanceof Byte)
- {
- return ((Byte) value).intValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Integer.valueOf((String) value).intValue();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to int.");
- }
- }
-
- public long getLong(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Long)
- {
- return ((Long) value).longValue();
- }
- else if (value instanceof Integer)
- {
- return ((Integer) value).longValue();
- }
-
- if (value instanceof Short)
- {
- return ((Short) value).longValue();
- }
-
- if (value instanceof Byte)
- {
- return ((Byte) value).longValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Long.valueOf((String) value).longValue();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to long.");
- }
- }
-
- public float getFloat(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Float)
- {
- return ((Float) value).floatValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Float.valueOf((String) value).floatValue();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to float.");
- }
- }
-
- public double getDouble(String name) throws JMSException
- {
- Object value = get(name);
-
- if (value instanceof Double)
- {
- return ((Double) value).doubleValue();
- }
- else if (value instanceof Float)
- {
- return ((Float) value).doubleValue();
- }
- else if ((value instanceof String) || (value == null))
- {
- return Double.valueOf((String) value).doubleValue();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to double.");
- }
- }
-
- public String getString(String name) throws JMSException
- {
- Object value = get(name);
-
- if ((value instanceof String) || (value == null))
- {
- return (String) value;
- }
- else if (value instanceof byte[] || value instanceof Binary)
- {
- throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
- }
- else
- {
- return value.toString();
- }
- }
-
- public byte[] getBytes(String name) throws JMSException
- {
- Object value = get(name);
-
- if (!itemExists(name))
- {
- throw new MessageFormatException("Property " + name + " not present");
- }
- else if ((value instanceof byte[]) || (value == null))
- {
- return (byte[]) value;
- }
- else if(value instanceof Binary)
- {
- return ((Binary)value).getArray();
- }
- else
- {
- throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
- + " cannot be converted to byte[].");
- } }
-
- public Object getObject(String s) throws JMSException
- {
- Object val = get(s);
- return val instanceof Binary ? ((Binary)val).getArray() : val;
- }
-
- public Enumeration getMapNames() throws JMSException
- {
- return Collections.enumeration(keySet());
- }
-
- public void setBoolean(String name, boolean val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setByte(String name, byte val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setShort(String name, short val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setChar(String name, char val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setInt(String name, int val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setLong(String name, long val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setFloat(String name, float val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setDouble(String name, double val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setString(String name, String val) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- put(name, val);
- }
-
- public void setBytes(String name, byte[] val) throws JMSException
- {
- setBytes(name, val, 0, val == null ? 0 : val.length);
- }
-
- public void setBytes(String name, byte[] bytes, int offset, int length) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- byte[] val;
-
- if(bytes == null)
- {
- val = null;
- }
- else
- {
- val = new byte[length];
- System.arraycopy(bytes,offset,val,0,length);
- }
-
- put(name, new Binary(val));
- }
-
- public void setObject(String name, Object value) throws JMSException
- {
- checkWritable();
- checkPropertyName(name);
- if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
- || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
- || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
- {
- put(name, value);
- }
- else
- {
- throw new MessageFormatException("Cannot set property " + name + " to value " + value + "of type "
- + value.getClass().getName() + ".");
- } }
-
- public boolean itemExists(String s)
- {
- return _map.containsKey(s);
- }
-
- public Object get(final Object key)
- {
- return _map.get(key);
- }
-
- public Object put(final Object key, final Object val)
- {
- return _map.put(key, val);
- }
-
- public boolean itemExists(final Object key)
- {
- return _map.containsKey(key);
- }
-
- public Set<Object> keySet()
- {
- return _map.keySet();
- }
-
- @Override
- public void clearBody() throws JMSException
- {
- super.clearBody();
- _map.clear();
- }
-
- private void checkPropertyName(String propName)
- {
- if ((propName == null) || propName.equals(""))
- {
- throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
- }
- }
-
- @Override Collection<Section> getSections()
- {
- List<Section> sections = new ArrayList<Section>();
- sections.add(getHeader());
- if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
- {
- sections.add(getMessageAnnotations());
- }
- sections.add(getProperties());
- sections.add(getApplicationProperties());
- sections.add(new AmqpValue(_map));
- sections.add(getFooter());
- return sections;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.jms.MapMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.*;
+
+public class MapMessageImpl extends MessageImpl implements MapMessage
+{
+ private Map _map;
+
+ public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
+ Footer footer,
+ SessionImpl session)
+ {
+ super(header, messageAnnotations, properties, appProperties, footer, session);
+ _map = map;
+ }
+
+ MapMessageImpl(final SessionImpl session)
+ {
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ session);
+ _map = new HashMap();
+ }
+
+ public boolean getBoolean(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Boolean)
+ {
+ return ((Boolean) value).booleanValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Boolean.valueOf((String) value);
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to boolean.");
+ }
+ }
+
+ public byte getByte(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).byteValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Byte.valueOf((String) value).byteValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to byte.");
+ } }
+
+ public short getShort(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).shortValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).shortValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Short.valueOf((String) value).shortValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to short.");
+ } }
+
+ public char getChar(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (!itemExists(name))
+ {
+ throw new MessageFormatException("Property " + name + " not present");
+ }
+ else if (value instanceof Character)
+ {
+ return ((Character) value).charValue();
+ }
+ else if (value == null)
+ {
+ throw new NullPointerException("Property " + name + " has null value and therefore cannot "
+ + "be converted to char.");
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to boolan.");
+ } }
+
+ public int getInt(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Integer)
+ {
+ return ((Integer) value).intValue();
+ }
+ else if (value instanceof Short)
+ {
+ return ((Short) value).intValue();
+ }
+ else if (value instanceof Byte)
+ {
+ return ((Byte) value).intValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Integer.valueOf((String) value).intValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to int.");
+ }
+ }
+
+ public long getLong(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Long)
+ {
+ return ((Long) value).longValue();
+ }
+ else if (value instanceof Integer)
+ {
+ return ((Integer) value).longValue();
+ }
+
+ if (value instanceof Short)
+ {
+ return ((Short) value).longValue();
+ }
+
+ if (value instanceof Byte)
+ {
+ return ((Byte) value).longValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Long.valueOf((String) value).longValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to long.");
+ }
+ }
+
+ public float getFloat(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Float)
+ {
+ return ((Float) value).floatValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Float.valueOf((String) value).floatValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to float.");
+ }
+ }
+
+ public double getDouble(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (value instanceof Double)
+ {
+ return ((Double) value).doubleValue();
+ }
+ else if (value instanceof Float)
+ {
+ return ((Float) value).doubleValue();
+ }
+ else if ((value instanceof String) || (value == null))
+ {
+ return Double.valueOf((String) value).doubleValue();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to double.");
+ }
+ }
+
+ public String getString(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if ((value instanceof String) || (value == null))
+ {
+ return (String) value;
+ }
+ else if (value instanceof byte[] || value instanceof Binary)
+ {
+ throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
+ }
+ else
+ {
+ return value.toString();
+ }
+ }
+
+ public byte[] getBytes(String name) throws JMSException
+ {
+ Object value = get(name);
+
+ if (!itemExists(name))
+ {
+ throw new MessageFormatException("Property " + name + " not present");
+ }
+ else if ((value instanceof byte[]) || (value == null))
+ {
+ return (byte[]) value;
+ }
+ else if(value instanceof Binary)
+ {
+ return ((Binary)value).getArray();
+ }
+ else
+ {
+ throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
+ + " cannot be converted to byte[].");
+ } }
+
+ public Object getObject(String s) throws JMSException
+ {
+ Object val = get(s);
+ return val instanceof Binary ? ((Binary)val).getArray() : val;
+ }
+
+ public Enumeration getMapNames() throws JMSException
+ {
+ return Collections.enumeration(keySet());
+ }
+
+ public void setBoolean(String name, boolean val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setByte(String name, byte val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setShort(String name, short val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setChar(String name, char val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setInt(String name, int val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setLong(String name, long val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setFloat(String name, float val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setDouble(String name, double val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setString(String name, String val) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ put(name, val);
+ }
+
+ public void setBytes(String name, byte[] val) throws JMSException
+ {
+ setBytes(name, val, 0, val == null ? 0 : val.length);
+ }
+
+ public void setBytes(String name, byte[] bytes, int offset, int length) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ byte[] val;
+
+ if(bytes == null)
+ {
+ val = null;
+ }
+ else
+ {
+ val = new byte[length];
+ System.arraycopy(bytes,offset,val,0,length);
+ }
+
+ put(name, new Binary(val));
+ }
+
+ public void setObject(String name, Object value) throws JMSException
+ {
+ checkWritable();
+ checkPropertyName(name);
+ if ((value instanceof Boolean) || (value instanceof Byte) || (value instanceof Short) || (value instanceof Integer)
+ || (value instanceof Long) || (value instanceof Character) || (value instanceof Float)
+ || (value instanceof Double) || (value instanceof String) || (value instanceof byte[]) || (value == null))
+ {
+ put(name, value);
+ }
+ else
+ {
+ throw new MessageFormatException("Cannot set property " + name + " to value " + value + "of type "
+ + value.getClass().getName() + ".");
+ } }
+
+ public boolean itemExists(String s)
+ {
+ return _map.containsKey(s);
+ }
+
+ public Object get(final Object key)
+ {
+ return _map.get(key);
+ }
+
+ public Object put(final Object key, final Object val)
+ {
+ return _map.put(key, val);
+ }
+
+ public boolean itemExists(final Object key)
+ {
+ return _map.containsKey(key);
+ }
+
+ public Set<Object> keySet()
+ {
+ return _map.keySet();
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _map.clear();
+ }
+
+ private void checkPropertyName(String propName)
+ {
+ if ((propName == null) || propName.equals(""))
+ {
+ throw new IllegalArgumentException("Property name cannot be null, or the empty String.");
+ }
+ }
+
+ @Override Collection<Section> getSections()
+ {
+ List<Section> sections = new ArrayList<Section>();
+ sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
+ sections.add(getProperties());
+ sections.add(getApplicationProperties());
+ sections.add(new AmqpValue(_map));
+ sections.add(getFooter());
+ return sections;
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Wed Sep 25 14:33:06 2013
@@ -1,485 +1,485 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
-import javax.jms.InvalidDestinationException;
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
-import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.client.Receiver;
-import org.apache.qpid.amqp_1_0.client.Transaction;
-import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
-import org.apache.qpid.amqp_1_0.jms.Queue;
-import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-import org.apache.qpid.amqp_1_0.jms.Session;
-import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
-import org.apache.qpid.amqp_1_0.jms.Topic;
-import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.Filter;
-import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
-import org.apache.qpid.amqp_1_0.type.transport.Error;
-
-public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
-{
- private static final Symbol NO_LOCAL_FILTER_NAME = Symbol.valueOf("no-local");
- private static final Symbol JMS_SELECTOR_FILTER_NAME = Symbol.valueOf("jms-selector");
- private String _selector;
- private boolean _noLocal;
- private DestinationImpl _destination;
- private SessionImpl _session;
- private Receiver _receiver;
- private Binary _lastUnackedMessage;
- MessageListener _messageListener;
-
- private boolean _isQueueConsumer;
- private boolean _isTopicSubscriber;
-
- private boolean _closed = false;
- private String _linkName;
- private boolean _durable;
- private Collection<Binary> _txnMsgs = Collections.synchronizedCollection(new ArrayList<Binary>());
- private Binary _lastTxnUpdate;
- private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
- private final List<Message> _replaymessages = new ArrayList<Message>();
-
- MessageConsumerImpl(final Destination destination,
- final SessionImpl session,
- final String selector,
- final boolean noLocal) throws JMSException
- {
- this(destination,session,selector,noLocal,null,false);
- }
-
- MessageConsumerImpl(final Destination destination,
- final SessionImpl session,
- final String selector,
- final boolean noLocal,
- final String linkName,
- final boolean durable) throws JMSException
- {
- _selector = selector;
- _noLocal = noLocal;
- _linkName = linkName;
- _durable = durable;
- if(destination instanceof DestinationImpl)
- {
- _destination = (DestinationImpl) destination;
- if(destination instanceof javax.jms.Queue)
- {
- _isQueueConsumer = true;
- }
- else if(destination instanceof javax.jms.Topic)
- {
- _isTopicSubscriber = true;
- }
- if(destination instanceof TemporaryDestination)
- {
- ((TemporaryDestination)destination).addConsumer(this);
- }
- }
- else
- {
- throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
- }
- _session = session;
-
- _receiver = createClientReceiver();
- _receiver.setRemoteErrorListener(new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
-
- if(exceptionListener != null)
- {
- final Error receiverError = _receiver.getError();
- exceptionListener.onException(new JMSException(receiverError.getDescription(),
- receiverError.getCondition().getValue().toString()));
-
- }
- }
- catch (JMSException e)
- {
-
- }
- }
- });
-
-
- }
-
- protected Receiver createClientReceiver() throws JMSException
- {
- try
- {
- return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
- _linkName, _durable, getFilters(), null);
- }
- catch (ConnectionErrorException e)
- {
- Error error = e.getRemoteError();
- if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
- {
- throw new InvalidSelectorException(e.getMessage());
- }
- else
- {
- throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
-
- }
- }
- }
-
- Map<Symbol, Filter> getFilters()
- {
- if(_selector == null || _selector.trim().equals(""))
- {
- if(_noLocal)
- {
- return Collections.singletonMap(NO_LOCAL_FILTER_NAME, (Filter) NoLocalFilter.INSTANCE);
- }
- else
- {
- return null;
-
- }
- }
- else if(_noLocal)
- {
- Map<Symbol, Filter> filters = new HashMap<Symbol, Filter>();
- filters.put(NO_LOCAL_FILTER_NAME, NoLocalFilter.INSTANCE);
- filters.put(JMS_SELECTOR_FILTER_NAME, new JMSSelectorFilter(_selector));
- return filters;
- }
- else
- {
- return Collections.singletonMap(JMS_SELECTOR_FILTER_NAME, (Filter)new JMSSelectorFilter(_selector));
- }
-
-
- }
-
- public String getMessageSelector() throws JMSException
- {
- checkClosed();
- return _selector;
- }
-
- public MessageListener getMessageListener() throws IllegalStateException
- {
- checkClosed();
- return _messageListener;
- }
-
- public void setMessageListener(final MessageListener messageListener) throws JMSException
- {
- checkClosed();
- _messageListener = messageListener;
- _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
- {
-
- public void messageArrived(final Receiver receiver)
- {
- _session.messageArrived(MessageConsumerImpl.this);
- }
- });
- _session.messageListenerSet( this );
-
- }
-
- public MessageImpl receive() throws JMSException
- {
- checkClosed();
- return receiveImpl(-1L);
- }
-
- public MessageImpl receive(final long timeout) throws JMSException
- {
- checkClosed();
- // TODO - validate timeout > 0
-
- return receiveImpl(timeout);
- }
-
- public MessageImpl receiveNoWait() throws JMSException
- {
- checkClosed();
- return receiveImpl(0L);
- }
-
- private MessageImpl receiveImpl(long timeout) throws JMSException
- {
-
- org.apache.qpid.amqp_1_0.client.Message msg;
- boolean redelivery;
- if(_replaymessages.isEmpty())
- {
- checkReceiverError();
- msg = receive0(timeout);
- redelivery = false;
- }
- else
- {
- msg = _replaymessages.remove(0);
- redelivery = true;
- }
-
- if(msg != null)
- {
- preReceiveAction(msg);
- }
- return createJMSMessage(msg, redelivery);
- }
-
- void checkReceiverError() throws JMSException
- {
- final Error receiverError = _receiver.getError();
- if(receiverError != null)
- {
- JMSException jmsException =
- new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
-
- throw jmsException;
- }
- }
-
- Message receive0(final long timeout)
- {
-
- Message message = _receiver.receive(timeout);
- if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
- {
- _recoverReplayMessages.add(message);
- }
- return message;
- }
-
-
- void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
- {
- _receiver.acknowledge(msg.getDeliveryTag(), _session.getTxn());
- }
-
- MessageImpl createJMSMessage(final Message msg, boolean redelivery)
- {
- if(msg != null)
- {
- MessageFactory factory = _session.getMessageFactory();
- final MessageImpl message = factory.createMessage(_destination, msg);
- message.setFromQueue(_isQueueConsumer);
- message.setFromTopic(_isTopicSubscriber);
- if(redelivery)
- {
- if(!message.getJMSRedelivered())
- {
- message.setJMSRedelivered(true);
- }
- }
-
- return message;
- }
- else
- {
- return null;
- }
- }
-
- public void close() throws JMSException
- {
- if(!_closed)
- {
- _closed = true;
-
- closeUnderlyingReceiver(_receiver);
-
- if(_destination instanceof TemporaryDestination)
- {
- ((TemporaryDestination)_destination).removeConsumer(this);
- }
- }
- }
-
- protected void closeUnderlyingReceiver(Receiver receiver)
- {
- receiver.close();
- }
-
- private void checkClosed() throws IllegalStateException
- {
- if(_closed)
- {
- throw new javax.jms.IllegalStateException("Closed");
- }
- }
-
- void setLastUnackedMessage(final Binary deliveryTag)
- {
- _lastUnackedMessage = deliveryTag;
- }
-
- void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
- {
- int acknowledgeMode = _session.getAckModeEnum().ordinal();
-
- if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
- || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
- || acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- acknowledge(msg);
- if(acknowledgeMode == Session.SESSION_TRANSACTED)
- {
- _txnMsgs.add(msg.getDeliveryTag());
- }
- }
- else if(acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
- {
- setLastUnackedMessage(msg.getDeliveryTag());
- }
- }
-
- void acknowledgeAll()
- {
- if(_lastUnackedMessage != null)
- {
- Transaction txn = _session.getTxn();
- _receiver.acknowledgeAll(_lastUnackedMessage, txn, null);
- if(txn != null)
- {
- _lastTxnUpdate = _lastUnackedMessage;
- }
- _lastUnackedMessage = null;
-
- }
- _recoverReplayMessages.clear();
- if(!_replaymessages.isEmpty())
- {
- _recoverReplayMessages.addAll(_replaymessages);
- }
- }
-
- void postRollback()
- {
- if(_lastTxnUpdate != null)
- {
- final Modified outcome = new Modified();
- outcome.setDeliveryFailed(true);
- _receiver.updateAll(outcome, _lastTxnUpdate);
- _lastTxnUpdate = null;
- }
- for(Binary tag : _txnMsgs)
- {
- _receiver.modified(tag);
- }
- _txnMsgs.clear();
- }
-
- void postCommit()
- {
- _lastTxnUpdate = null;
- _txnMsgs.clear();
- }
-
- public DestinationImpl getDestination() throws IllegalStateException
- {
- checkClosed();
- return _destination;
- }
-
-
- public SessionImpl getSession() throws IllegalStateException
- {
- checkClosed();
- return _session;
- }
-
- public boolean getNoLocal() throws IllegalStateException
- {
- checkClosed();
- return _noLocal;
- }
-
- public void start()
- {
- _receiver.setCredit(UnsignedInteger.valueOf(100), true);
- }
-
- public Queue getQueue() throws JMSException
- {
- return (Queue) getDestination();
- }
-
- public Topic getTopic() throws JMSException
- {
- return (Topic) getDestination();
- }
-
- void setQueueConsumer(final boolean queueConsumer)
- {
- _isQueueConsumer = queueConsumer;
- }
-
- void setTopicSubscriber(final boolean topicSubscriber)
- {
- _isTopicSubscriber = topicSubscriber;
- }
-
- String getLinkName()
- {
- return _linkName;
- }
-
- boolean isDurable()
- {
- return _durable;
- }
-
- void doRecover()
- {
- _replaymessages.clear();
- if(!_recoverReplayMessages.isEmpty())
- {
- _replaymessages.addAll(_recoverReplayMessages);
- for(Message msg : _replaymessages)
- {
- _session.messageArrived(this);
- }
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import org.apache.qpid.amqp_1_0.client.AcknowledgeMode;
+import org.apache.qpid.amqp_1_0.client.ConnectionErrorException;
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.client.Transaction;
+import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
+{
+ private static final Symbol NO_LOCAL_FILTER_NAME = Symbol.valueOf("no-local");
+ private static final Symbol JMS_SELECTOR_FILTER_NAME = Symbol.valueOf("jms-selector");
+ private String _selector;
+ private boolean _noLocal;
+ private DestinationImpl _destination;
+ private SessionImpl _session;
+ private Receiver _receiver;
+ private Binary _lastUnackedMessage;
+ MessageListener _messageListener;
+
+ private boolean _isQueueConsumer;
+ private boolean _isTopicSubscriber;
+
+ private boolean _closed = false;
+ private String _linkName;
+ private boolean _durable;
+ private Collection<Binary> _txnMsgs = Collections.synchronizedCollection(new ArrayList<Binary>());
+ private Binary _lastTxnUpdate;
+ private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
+ private final List<Message> _replaymessages = new ArrayList<Message>();
+
+ MessageConsumerImpl(final Destination destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal) throws JMSException
+ {
+ this(destination,session,selector,noLocal,null,false);
+ }
+
+ MessageConsumerImpl(final Destination destination,
+ final SessionImpl session,
+ final String selector,
+ final boolean noLocal,
+ final String linkName,
+ final boolean durable) throws JMSException
+ {
+ _selector = selector;
+ _noLocal = noLocal;
+ _linkName = linkName;
+ _durable = durable;
+ if(destination instanceof DestinationImpl)
+ {
+ _destination = (DestinationImpl) destination;
+ if(destination instanceof javax.jms.Queue)
+ {
+ _isQueueConsumer = true;
+ }
+ else if(destination instanceof javax.jms.Topic)
+ {
+ _isTopicSubscriber = true;
+ }
+ if(destination instanceof TemporaryDestination)
+ {
+ ((TemporaryDestination)destination).addConsumer(this);
+ }
+ }
+ else
+ {
+ throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
+ }
+ _session = session;
+
+ _receiver = createClientReceiver();
+ _receiver.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener = _session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final Error receiverError = _receiver.getError();
+ exceptionListener.onException(new JMSException(receiverError.getDescription(),
+ receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
+
+
+ }
+
+ protected Receiver createClientReceiver() throws JMSException
+ {
+ try
+ {
+ return _session.getClientSession().createReceiver(_session.toAddress(_destination), AcknowledgeMode.ALO,
+ _linkName, _durable, getFilters(), null);
+ }
+ catch (ConnectionErrorException e)
+ {
+ Error error = e.getRemoteError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition()))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+
+ }
+ }
+ }
+
+ Map<Symbol, Filter> getFilters()
+ {
+ if(_selector == null || _selector.trim().equals(""))
+ {
+ if(_noLocal)
+ {
+ return Collections.singletonMap(NO_LOCAL_FILTER_NAME, (Filter) NoLocalFilter.INSTANCE);
+ }
+ else
+ {
+ return null;
+
+ }
+ }
+ else if(_noLocal)
+ {
+ Map<Symbol, Filter> filters = new HashMap<Symbol, Filter>();
+ filters.put(NO_LOCAL_FILTER_NAME, NoLocalFilter.INSTANCE);
+ filters.put(JMS_SELECTOR_FILTER_NAME, new JMSSelectorFilter(_selector));
+ return filters;
+ }
+ else
+ {
+ return Collections.singletonMap(JMS_SELECTOR_FILTER_NAME, (Filter)new JMSSelectorFilter(_selector));
+ }
+
+
+ }
+
+ public String getMessageSelector() throws JMSException
+ {
+ checkClosed();
+ return _selector;
+ }
+
+ public MessageListener getMessageListener() throws IllegalStateException
+ {
+ checkClosed();
+ return _messageListener;
+ }
+
+ public void setMessageListener(final MessageListener messageListener) throws JMSException
+ {
+ checkClosed();
+ _messageListener = messageListener;
+ _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
+ {
+
+ public void messageArrived(final Receiver receiver)
+ {
+ _session.messageArrived(MessageConsumerImpl.this);
+ }
+ });
+ _session.messageListenerSet( this );
+
+ }
+
+ public MessageImpl receive() throws JMSException
+ {
+ checkClosed();
+ return receiveImpl(-1L);
+ }
+
+ public MessageImpl receive(final long timeout) throws JMSException
+ {
+ checkClosed();
+ // TODO - validate timeout > 0
+
+ return receiveImpl(timeout);
+ }
+
+ public MessageImpl receiveNoWait() throws JMSException
+ {
+ checkClosed();
+ return receiveImpl(0L);
+ }
+
+ private MessageImpl receiveImpl(long timeout) throws JMSException
+ {
+
+ org.apache.qpid.amqp_1_0.client.Message msg;
+ boolean redelivery;
+ if(_replaymessages.isEmpty())
+ {
+ checkReceiverError();
+ msg = receive0(timeout);
+ redelivery = false;
+ }
+ else
+ {
+ msg = _replaymessages.remove(0);
+ redelivery = true;
+ }
+
+ if(msg != null)
+ {
+ preReceiveAction(msg);
+ }
+ return createJMSMessage(msg, redelivery);
+ }
+
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(), receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
+ Message receive0(final long timeout)
+ {
+
+ Message message = _receiver.receive(timeout);
+ if(_session.getAckModeEnum() == Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
+ {
+ _recoverReplayMessages.add(message);
+ }
+ return message;
+ }
+
+
+ void acknowledge(final org.apache.qpid.amqp_1_0.client.Message msg)
+ {
+ _receiver.acknowledge(msg.getDeliveryTag(), _session.getTxn());
+ }
+
+ MessageImpl createJMSMessage(final Message msg, boolean redelivery)
+ {
+ if(msg != null)
+ {
+ MessageFactory factory = _session.getMessageFactory();
+ final MessageImpl message = factory.createMessage(_destination, msg);
+ message.setFromQueue(_isQueueConsumer);
+ message.setFromTopic(_isTopicSubscriber);
+ if(redelivery)
+ {
+ if(!message.getJMSRedelivered())
+ {
+ message.setJMSRedelivered(true);
+ }
+ }
+
+ return message;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ if(!_closed)
+ {
+ _closed = true;
+
+ closeUnderlyingReceiver(_receiver);
+
+ if(_destination instanceof TemporaryDestination)
+ {
+ ((TemporaryDestination)_destination).removeConsumer(this);
+ }
+ }
+ }
+
+ protected void closeUnderlyingReceiver(Receiver receiver)
+ {
+ receiver.close();
+ }
+
+ private void checkClosed() throws IllegalStateException
+ {
+ if(_closed)
+ {
+ throw new javax.jms.IllegalStateException("Closed");
+ }
+ }
+
+ void setLastUnackedMessage(final Binary deliveryTag)
+ {
+ _lastUnackedMessage = deliveryTag;
+ }
+
+ void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
+ {
+ int acknowledgeMode = _session.getAckModeEnum().ordinal();
+
+ if(acknowledgeMode == Session.AUTO_ACKNOWLEDGE
+ || acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE
+ || acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ acknowledge(msg);
+ if(acknowledgeMode == Session.SESSION_TRANSACTED)
+ {
+ _txnMsgs.add(msg.getDeliveryTag());
+ }
+ }
+ else if(acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ setLastUnackedMessage(msg.getDeliveryTag());
+ }
+ }
+
+ void acknowledgeAll()
+ {
+ if(_lastUnackedMessage != null)
+ {
+ Transaction txn = _session.getTxn();
+ _receiver.acknowledgeAll(_lastUnackedMessage, txn, null);
+ if(txn != null)
+ {
+ _lastTxnUpdate = _lastUnackedMessage;
+ }
+ _lastUnackedMessage = null;
+
+ }
+ _recoverReplayMessages.clear();
+ if(!_replaymessages.isEmpty())
+ {
+ _recoverReplayMessages.addAll(_replaymessages);
+ }
+ }
+
+ void postRollback()
+ {
+ if(_lastTxnUpdate != null)
+ {
+ final Modified outcome = new Modified();
+ outcome.setDeliveryFailed(true);
+ _receiver.updateAll(outcome, _lastTxnUpdate);
+ _lastTxnUpdate = null;
+ }
+ for(Binary tag : _txnMsgs)
+ {
+ _receiver.modified(tag);
+ }
+ _txnMsgs.clear();
+ }
+
+ void postCommit()
+ {
+ _lastTxnUpdate = null;
+ _txnMsgs.clear();
+ }
+
+ public DestinationImpl getDestination() throws IllegalStateException
+ {
+ checkClosed();
+ return _destination;
+ }
+
+
+ public SessionImpl getSession() throws IllegalStateException
+ {
+ checkClosed();
+ return _session;
+ }
+
+ public boolean getNoLocal() throws IllegalStateException
+ {
+ checkClosed();
+ return _noLocal;
+ }
+
+ public void start()
+ {
+ _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ }
+
+ public Queue getQueue() throws JMSException
+ {
+ return (Queue) getDestination();
+ }
+
+ public Topic getTopic() throws JMSException
+ {
+ return (Topic) getDestination();
+ }
+
+ void setQueueConsumer(final boolean queueConsumer)
+ {
+ _isQueueConsumer = queueConsumer;
+ }
+
+ void setTopicSubscriber(final boolean topicSubscriber)
+ {
+ _isTopicSubscriber = topicSubscriber;
+ }
+
+ String getLinkName()
+ {
+ return _linkName;
+ }
+
+ boolean isDurable()
+ {
+ return _durable;
+ }
+
+ void doRecover()
+ {
+ _replaymessages.clear();
+ if(!_recoverReplayMessages.isEmpty())
+ {
+ _replaymessages.addAll(_recoverReplayMessages);
+ for(Message msg : _replaymessages)
+ {
+ _session.messageArrived(this);
+ }
+ }
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1526190&r1=1526189&r2=1526190&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Wed Sep 25 14:33:06 2013
@@ -1,191 +1,191 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.amqp_1_0.jms.impl;
-
-import org.apache.qpid.amqp_1_0.client.Message;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
-import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-import java.util.*;
-
-class MessageFactory
-{
- private final SessionImpl _session;
-
-
- MessageFactory(final SessionImpl session)
- {
- _session = session;
- }
-
- public MessageImpl createMessage(final DestinationImpl destination, final Message msg)
- {
- MessageImpl message;
- List<Section> payload = msg.getPayload();
- Header header = null;
- MessageAnnotations messageAnnotations = null;
-
- Properties properties = null;
- ApplicationProperties appProperties = null;
- Footer footer;
-
- Iterator<Section> iter = payload.iterator();
- List<Section> body = new ArrayList<Section>();
-
- Section section = iter.hasNext() ? iter.next() : null;
-
- if(section instanceof Header)
- {
- header = (Header) section;
- section = iter.hasNext() ? iter.next() : null;
- }
-
- if(section instanceof MessageAnnotations)
- {
- messageAnnotations = (MessageAnnotations) section;
- section = iter.hasNext() ? iter.next() : null;
- }
-
- if(section instanceof Properties)
- {
- properties = (Properties) section;
- section = iter.hasNext() ? iter.next() : null;
- }
-
- if(section instanceof ApplicationProperties)
- {
- appProperties = (ApplicationProperties) section;
- section = iter.hasNext() ? iter.next() : null;
- }
-
- while(section != null && !(section instanceof Footer))
- {
- body.add(section);
- section = iter.hasNext() ? iter.next() : null;
- }
-
- footer = (Footer) section;
-
- if(body.size() == 1)
- {
- Section bodySection = body.get(0);
- if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
- {
- message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
- }
- else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
- {
- message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
- (List) ((AmqpValue)bodySection).getValue(), footer, _session);
- }
- else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
- {
- message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
- (String) ((AmqpValue)bodySection).getValue(), footer, _session);
- }
- else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Binary)
- {
-
- Binary value = (Binary) ((AmqpValue) bodySection).getValue();
- message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties,
- new Data(value), footer, _session);
- }
- else if(bodySection instanceof Data)
- {
- if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
- {
-
-
- message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties,
- (Data) bodySection,
- footer,
- _session);
- }
- else
- {
- message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
- }
- }
- else if(bodySection instanceof AmqpSequence)
- {
- message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
- }
-
- /*else if(bodySection instanceof AmqpDataSection)
- {
- AmqpDataSection dataSection = (AmqpDataSection) bodySection;
-
- List<Object> data = new ArrayList<Object>();
-
- ListIterator<Object> dataIter = dataSection.iterator();
-
- while(dataIter.hasNext())
- {
- data.add(dataIter.next());
- }
-
- if(data.size() == 1)
- {
- final Object obj = data.get(0);
- if( obj instanceof String)
- {
- message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
- }
- else if(obj instanceof JavaSerializable)
- {
- // TODO - ObjectMessage
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
- }
- else if(obj instanceof Serializable)
- {
- message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
- }
- else
- {
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
- }
- }
- else
- {
- // not a text message
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
- }
- }*/
- else
- {
- message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
- }
- }
- else
- {
- message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
- }
-
- message.setReadOnly();
-
- return message;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.jms.impl;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.*;
+
+class MessageFactory
+{
+ private final SessionImpl _session;
+
+
+ MessageFactory(final SessionImpl session)
+ {
+ _session = session;
+ }
+
+ public MessageImpl createMessage(final DestinationImpl destination, final Message msg)
+ {
+ MessageImpl message;
+ List<Section> payload = msg.getPayload();
+ Header header = null;
+ MessageAnnotations messageAnnotations = null;
+
+ Properties properties = null;
+ ApplicationProperties appProperties = null;
+ Footer footer;
+
+ Iterator<Section> iter = payload.iterator();
+ List<Section> body = new ArrayList<Section>();
+
+ Section section = iter.hasNext() ? iter.next() : null;
+
+ if(section instanceof Header)
+ {
+ header = (Header) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ if(section instanceof MessageAnnotations)
+ {
+ messageAnnotations = (MessageAnnotations) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ if(section instanceof Properties)
+ {
+ properties = (Properties) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ if(section instanceof ApplicationProperties)
+ {
+ appProperties = (ApplicationProperties) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ while(section != null && !(section instanceof Footer))
+ {
+ body.add(section);
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
+ footer = (Footer) section;
+
+ if(body.size() == 1)
+ {
+ Section bodySection = body.get(0);
+ if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
+ {
+ message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
+ {
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
+ (List) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
+ {
+ message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
+ (String) ((AmqpValue)bodySection).getValue(), footer, _session);
+ }
+ else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Binary)
+ {
+
+ Binary value = (Binary) ((AmqpValue) bodySection).getValue();
+ message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties,
+ new Data(value), footer, _session);
+ }
+ else if(bodySection instanceof Data)
+ {
+ if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+ {
+
+
+ message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties,
+ (Data) bodySection,
+ footer,
+ _session);
+ }
+ else
+ {
+ message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
+ }
+ }
+ else if(bodySection instanceof AmqpSequence)
+ {
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+ }
+
+ /*else if(bodySection instanceof AmqpDataSection)
+ {
+ AmqpDataSection dataSection = (AmqpDataSection) bodySection;
+
+ List<Object> data = new ArrayList<Object>();
+
+ ListIterator<Object> dataIter = dataSection.iterator();
+
+ while(dataIter.hasNext())
+ {
+ data.add(dataIter.next());
+ }
+
+ if(data.size() == 1)
+ {
+ final Object obj = data.get(0);
+ if( obj instanceof String)
+ {
+ message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
+ }
+ else if(obj instanceof JavaSerializable)
+ {
+ // TODO - ObjectMessage
+ message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ }
+ else if(obj instanceof Serializable)
+ {
+ message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
+ }
+ else
+ {
+ message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ }
+ }
+ else
+ {
+ // not a text message
+ message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ }
+ }*/
+ else
+ {
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
+ }
+ }
+ else
+ {
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
+ }
+
+ message.setReadOnly();
+
+ return message;
+ }
+}
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org