You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/14 23:36:09 UTC
svn commit: r1624911 -
/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
Author: rgodfrey
Date: Sun Sep 14 21:36:08 2014
New Revision: 1624911
URL: http://svn.apache.org/r1624911
Log:
QPID-6098 : [JMS AMQP 1.0 Client] use content-type of incoming messages to drive JMS message type
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1624911&r1=1624910&r2=1624911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Sun Sep 14 21:36:08 2014
@@ -19,14 +19,22 @@
package org.apache.qpid.amqp_1_0.jms.impl;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
@@ -133,23 +141,76 @@ class MessageFactory
}
else if(bodySection instanceof Data)
{
- if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+ Data dataSection = (Data) bodySection;
+
+ Symbol contentType = properties == null ? null : properties.getContentType();
+
+ if(ObjectMessageImpl.CONTENT_TYPE.equals(contentType))
{
message = new ObjectMessageImpl(header,
deliveryAnnotations,
messageAnnotations, properties, appProperties,
- (Data) bodySection,
+ dataSection,
footer,
_session);
}
+ else if(contentType != null)
+ {
+ ContentType contentTypeObj = parseContentType(contentType.toString());
+ // todo : content encoding - e.g. gzip
+ String contentTypeType = contentTypeObj.getType();
+ String contentTypeSubType = contentTypeObj.getSubType();
+ if ("text".equals(contentTypeType)
+ || ("application".equals(contentTypeType)
+ && contentTypeSubType != null
+ && ("xml".equals(contentTypeSubType)
+ || "json".equals(contentTypeSubType)
+ || "xml-dtd".equals(contentTypeSubType)
+ || "javascript".equals(contentTypeSubType)
+ || contentTypeSubType.endsWith("+xml")
+ || contentTypeSubType.endsWith("+json"))))
+ {
+ Charset charset;
+ if (contentTypeObj.getParameterNames().contains("charset"))
+ {
+ charset = Charset.forName(contentTypeObj.getParameter("charset").toUpperCase());
+ }
+ else
+ {
+ charset = StandardCharsets.US_ASCII;
+ }
+ Binary binary = dataSection.getValue();
+ String data =
+ new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), charset);
+ message = new TextMessageImpl(header, deliveryAnnotations, messageAnnotations, properties,
+ appProperties, data, footer, _session);
+ }
+ else
+ {
+ message = new BytesMessageImpl(header,
+ deliveryAnnotations,
+ messageAnnotations,
+ properties,
+ appProperties,
+ dataSection,
+ footer,
+ _session);
+ }
+ }
else
{
message = new BytesMessageImpl(header,
deliveryAnnotations,
- messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
+ messageAnnotations,
+ properties,
+ appProperties,
+ dataSection,
+ footer,
+ _session);
}
+
}
else if(bodySection instanceof AmqpSequence)
{
@@ -158,63 +219,126 @@ class MessageFactory
messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session
);
}
-
- /*else if(bodySection instanceof AmqpDataSection)
+ else
{
- AmqpDataSection dataSection = (AmqpDataSection) bodySection;
+ message = new AmqpMessageImpl(header,
+ deliveryAnnotations,
+ messageAnnotations, properties,appProperties,body,footer, _session);
+ }
+ }
+ else
+ {
+ message = new AmqpMessageImpl(header,
+ deliveryAnnotations,
+ messageAnnotations, properties,appProperties,body,footer, _session);
+ }
- List<Object> data = new ArrayList<Object>();
+ message.setReadOnly();
- ListIterator<Object> dataIter = dataSection.iterator();
+ return message;
+ }
- while(dataIter.hasNext())
- {
- data.add(dataIter.next());
- }
- if(data.size() == 1)
+ static interface ContentType
+ {
+ String getType();
+ String getSubType();
+ Set<String> getParameterNames();
+ String getParameter(String name);
+ }
+
+ static ContentType parseContentType(String contentType)
+ {
+ int subTypeSeparator = contentType.indexOf("/");
+ final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
+ String subTypePart = contentType.substring(subTypeSeparator +1).toLowerCase().trim();
+ if(subTypePart.contains(";"))
+ {
+ subTypePart = subTypePart.substring(0,subTypePart.indexOf(";")).trim();
+ }
+ if(subTypePart.contains("("))
+ {
+ subTypePart = subTypePart.substring(0,subTypePart.indexOf("(")).trim();
+ }
+ final String subType = subTypePart;
+ final Map<String,String> parameters = new HashMap<>();
+
+ if(contentType.substring(subTypeSeparator +1).contains(";"))
+ {
+ parseContentTypeParameters(contentType.substring(contentType.indexOf(";",subTypeSeparator +1)+1), parameters);
+ }
+ return new ContentType()
{
- final Object obj = data.get(0);
- if( obj instanceof String)
+ @Override
+ public String getType()
{
- message = new TextMessageImpl(header,properties,appProperties,(String) data.get(0),footer, _session);
+ return type;
}
- else if(obj instanceof JavaSerializable)
+
+ @Override
+ public String getSubType()
{
- // TODO - ObjectMessage
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ return subType;
}
- else if(obj instanceof Serializable)
+
+ @Override
+ public Set<String> getParameterNames()
{
- message = new ObjectMessageImpl(header,properties,footer,appProperties,(Serializable)obj, _session);
+ return Collections.unmodifiableMap(parameters).keySet();
}
- else
+
+ @Override
+ public String getParameter(final String name)
{
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ return parameters.get(name);
}
+ };
+ }
+
+ private static void parseContentTypeParameters(final String parameterString, final Map<String, String> parameters)
+ {
+ int equalsIndex = parameterString.indexOf("=");
+ if(equalsIndex != -1)
+ {
+ String paramName = parameterString.substring(0,equalsIndex).trim();
+ String valuePart = equalsIndex == parameterString.length() - 1 ? "" : parameterString.substring(equalsIndex+1).trim();
+ String remainder;
+ if(valuePart.startsWith("\""))
+ {
+ int closeQuoteIndex = valuePart.indexOf("\"", 1);
+ if(closeQuoteIndex != -1)
+ {
+ parameters.put(paramName, valuePart.substring(1, closeQuoteIndex));
+ remainder = (closeQuoteIndex == valuePart.length()-1) ? "" : valuePart.substring(closeQuoteIndex+1);
}
else
{
- // not a text message
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ remainder = "";
}
- }*/
+ }
else
{
- message = new AmqpMessageImpl(header,
- deliveryAnnotations,
- messageAnnotations, properties,appProperties,body,footer, _session);
+ Pattern pattern = Pattern.compile("\\s|;|\\(");
+ Matcher matcher = pattern.matcher(valuePart);
+ if(matcher.matches())
+ {
+ parameters.put(paramName, valuePart.substring(0,matcher.start()));
+ remainder = valuePart.substring(matcher.start());
+ }
+ else
+ {
+ parameters.put(paramName, valuePart);
+ remainder = "";
+ }
}
- }
- else
- {
- message = new AmqpMessageImpl(header,
- deliveryAnnotations,
- messageAnnotations, properties,appProperties,body,footer, _session);
- }
- message.setReadOnly();
+ int paramSep = remainder.indexOf(";");
+ if(paramSep != -1 && paramSep != remainder.length()-1)
+ {
+ parseContentTypeParameters(remainder.substring(paramSep+1),parameters);
+ }
+ }
- return message;
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org