You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/14 23:36:09 UTC

svn commit: r1624911 - /qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java

Author: rgodfrey
Date: Sun Sep 14 21:36:08 2014
New Revision: 1624911

URL: http://svn.apache.org/r1624911
Log:
QPID-6098 : [JMS AMQP 1.0 Client] use content-type of incoming messages to drive JMS message type

Modified:
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1624911&r1=1624910&r2=1624911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Sun Sep 14 21:36:08 2014
@@ -19,14 +19,22 @@
 
 package org.apache.qpid.amqp_1_0.jms.impl;
 
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.qpid.amqp_1_0.client.Message;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
 import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
@@ -133,23 +141,76 @@ class MessageFactory
             }
             else if(bodySection instanceof Data)
             {
-                if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+                Data dataSection = (Data) bodySection;
+
+                Symbol contentType = properties == null ? null : properties.getContentType();
+
+                if(ObjectMessageImpl.CONTENT_TYPE.equals(contentType))
                 {
 
 
                     message = new ObjectMessageImpl(header,
                                                     deliveryAnnotations,
                                                     messageAnnotations, properties, appProperties,
-                                                    (Data) bodySection,
+                                                    dataSection,
                                                     footer,
                                                     _session);
                 }
+                else if(contentType != null)
+                {
+                    ContentType contentTypeObj = parseContentType(contentType.toString());
+                    // todo : content encoding - e.g. gzip
+                    String contentTypeType = contentTypeObj.getType();
+                    String contentTypeSubType = contentTypeObj.getSubType();
+                    if ("text".equals(contentTypeType)
+                        || ("application".equals(contentTypeType)
+                            && contentTypeSubType != null
+                            && ("xml".equals(contentTypeSubType)
+                                || "json".equals(contentTypeSubType)
+                                || "xml-dtd".equals(contentTypeSubType)
+                                || "javascript".equals(contentTypeSubType)
+                                || contentTypeSubType.endsWith("+xml")
+                                || contentTypeSubType.endsWith("+json"))))
+                    {
+                        Charset charset;
+                        if (contentTypeObj.getParameterNames().contains("charset"))
+                        {
+                            charset = Charset.forName(contentTypeObj.getParameter("charset").toUpperCase());
+                        }
+                        else
+                        {
+                            charset = StandardCharsets.US_ASCII;
+                        }
+                        Binary binary = dataSection.getValue();
+                        String data =
+                                new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), charset);
+                        message = new TextMessageImpl(header, deliveryAnnotations, messageAnnotations, properties,
+                                                      appProperties, data, footer, _session);
+                    }
+                    else
+                    {
+                        message = new BytesMessageImpl(header,
+                                                       deliveryAnnotations,
+                                                       messageAnnotations,
+                                                       properties,
+                                                       appProperties,
+                                                       dataSection,
+                                                       footer,
+                                                       _session);
+                    }
+                }
                 else
                 {
                     message = new BytesMessageImpl(header,
                                                    deliveryAnnotations,
-                                                   messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
+                                                   messageAnnotations,
+                                                   properties,
+                                                   appProperties,
+                                                   dataSection,
+                                                   footer,
+                                                   _session);
                 }
+
             }
             else if(bodySection instanceof AmqpSequence)
             {
@@ -158,63 +219,126 @@ class MessageFactory
                                                 messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session
                 );
             }
-
-            /*else if(bodySection instanceof AmqpDataSection)
+            else
             {
-                AmqpDataSection dataSection = (AmqpDataSection) bodySection;
+                message = new AmqpMessageImpl(header,
+                                              deliveryAnnotations,
+                                              messageAnnotations, properties,appProperties,body,footer, _session);
+            }
+        }
+        else
+        {
+            message = new AmqpMessageImpl(header,
+                                          deliveryAnnotations,
+                                          messageAnnotations, properties,appProperties,body,footer, _session);
+        }
 
-                List<Object> data = new ArrayList<Object>();
+        message.setReadOnly();
 
-                ListIterator<Object> dataIter = dataSection.iterator();
+        return message;
+    }
 
-                while(dataIter.hasNext())
-                {
-                    data.add(dataIter.next());
-                }
 
-                if(data.size() == 1)
+    static interface ContentType
+    {
+        String getType();
+        String getSubType();
+        Set<String> getParameterNames();
+        String getParameter(String name);
+    }
+
+    static ContentType parseContentType(String contentType)
+    {
+        int subTypeSeparator = contentType.indexOf("/");
+        final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
+        String subTypePart = contentType.substring(subTypeSeparator +1).toLowerCase().trim();
+        if(subTypePart.contains(";"))
+        {
+            subTypePart = subTypePart.substring(0,subTypePart.indexOf(";")).trim();
+        }
+        if(subTypePart.contains("("))
+        {
+            subTypePart = subTypePart.substring(0,subTypePart.indexOf("(")).trim();
+        }
+        final String subType = subTypePart;
+        final Map<String,String> parameters = new HashMap<>();
+
+        if(contentType.substring(subTypeSeparator +1).contains(";"))
+        {
+            parseContentTypeParameters(contentType.substring(contentType.indexOf(";",subTypeSeparator +1)+1), parameters);
+        }
+        return  new ContentType()
                 {
-                    final Object obj = data.get(0);
-                    if( obj instanceof String)
+                    @Override
+                    public String getType()
                     {
-                        message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
+                        return type;
                     }
-                    else if(obj instanceof JavaSerializable)
+
+                    @Override
+                    public String getSubType()
                     {
-                        // TODO - ObjectMessage
-                        message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                        return subType;
                     }
-                    else if(obj instanceof Serializable)
+
+                    @Override
+                    public Set<String> getParameterNames()
                     {
-                        message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
+                        return Collections.unmodifiableMap(parameters).keySet();
                     }
-                    else
+
+                    @Override
+                    public String getParameter(final String name)
                     {
-                        message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                        return parameters.get(name);
                     }
+                };
+    }
+
+    private static void parseContentTypeParameters(final String parameterString, final Map<String, String> parameters)
+    {
+        int equalsIndex = parameterString.indexOf("=");
+        if(equalsIndex != -1)
+        {
+            String paramName = parameterString.substring(0,equalsIndex).trim();
+            String valuePart = equalsIndex == parameterString.length() - 1 ? "" : parameterString.substring(equalsIndex+1).trim();
+            String remainder;
+            if(valuePart.startsWith("\""))
+            {
+                int closeQuoteIndex = valuePart.indexOf("\"", 1);
+                if(closeQuoteIndex != -1)
+                {
+                    parameters.put(paramName, valuePart.substring(1, closeQuoteIndex));
+                    remainder = (closeQuoteIndex == valuePart.length()-1) ? "" : valuePart.substring(closeQuoteIndex+1);
                 }
                 else
                 {
-                    // not a text message
-                    message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                    remainder = "";
                 }
-            }*/
+            }
             else
             {
-                message = new AmqpMessageImpl(header,
-                                              deliveryAnnotations,
-                                              messageAnnotations, properties,appProperties,body,footer, _session);
+                Pattern pattern = Pattern.compile("\\s|;|\\(");
+                Matcher matcher = pattern.matcher(valuePart);
+                if(matcher.matches())
+                {
+                    parameters.put(paramName, valuePart.substring(0,matcher.start()));
+                    remainder = valuePart.substring(matcher.start());
+                }
+                else
+                {
+                    parameters.put(paramName, valuePart);
+                    remainder = "";
+                }
             }
-        }
-        else
-        {
-            message = new AmqpMessageImpl(header,
-                                          deliveryAnnotations,
-                                          messageAnnotations, properties,appProperties,body,footer, _session);
-        }
 
-        message.setReadOnly();
+            int paramSep = remainder.indexOf(";");
+            if(paramSep != -1 && paramSep != remainder.length()-1)
+            {
+                parseContentTypeParameters(remainder.substring(paramSep+1),parameters);
+            }
+        }
 
-        return message;
     }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org