You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by ni...@apache.org on 2008/08/31 06:49:48 UTC
svn commit: r690638 [2/2] - in /cxf/trunk/rt/transports/jms/src:
main/java/org/apache/cxf/transport/jms/
test/java/org/apache/cxf/transport/jms/
Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=690638&r1=690637&r2=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Sat Aug 30 21:49:48 2008
@@ -19,20 +19,29 @@
package org.apache.cxf.transport.jms;
-
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Enumeration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.CastUtils;
public final class JMSUtils {
@@ -53,27 +62,26 @@
String name = (String)props.nextElement();
String value = env.getProperty(name);
LOG.log(Level.FINE, "Context property: " + name + " | " + value);
- }
+ }
}
-
+
Context context = new InitialContext(env);
return context;
}
-
protected static void populateContextEnvironment(AddressType addrType, Properties env) {
-
- java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator();
+
+ java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator();
while (listIter.hasNext()) {
JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next();
-
+
if (null != propertyPair.getValue()) {
env.setProperty(propertyPair.getName(), propertyPair.getValue());
}
}
- }
+ }
public static int getJMSDeliveryMode(JMSMessageHeadersType headers) {
int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
@@ -108,8 +116,8 @@
return correlationId;
}
- public static void setMessageProperties(JMSMessageHeadersType headers,
- Message message) throws JMSException {
+ public static void setMessageProperties(JMSMessageHeadersType headers, Message message)
+ throws JMSException {
if (headers != null && headers.isSetProperty()) {
List<JMSPropertyType> props = headers.getProperty();
@@ -118,5 +126,146 @@
}
}
}
-
+
+ /**
+ * Create a JMS of the appropriate type populated with the given payload.
+ *
+ * @param payload the message payload, expected to be either of type String or byte[] depending on payload
+ * type
+ * @param session the JMS session
+ * @param replyTo the ReplyTo destination if any
+ * @return a JMS of the appropriate type populated with the given payload
+ */
+ public static Message marshal(Object payload, Session session, Destination replyTo, String messageType)
+ throws JMSException {
+ Message message = null;
+
+ if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) {
+ message = session.createTextMessage((String)payload);
+ } else if (JMSConstants.BYTE_MESSAGE_TYPE.equals(messageType)) {
+ message = session.createBytesMessage();
+ ((BytesMessage)message).writeBytes((byte[])payload);
+ } else {
+ message = session.createObjectMessage();
+ ((ObjectMessage)message).setObject((byte[])payload);
+ }
+
+ if (replyTo != null) {
+ message.setJMSReplyTo(replyTo);
+ }
+
+ return message;
+ }
+
+ /**
+ * Unmarshal the payload of an incoming message.
+ *
+ * @param message the incoming message
+ * @return the unmarshalled message payload, either of type String or byte[] depending on payload type
+ */
+ public static Object unmarshal(Message message) throws JMSException {
+ Object ret = null;
+
+ if (message instanceof TextMessage) {
+ ret = ((TextMessage)message).getText();
+ } else if (message instanceof BytesMessage) {
+ byte[] retBytes = new byte[(int)((BytesMessage)message).getBodyLength()];
+ ((BytesMessage)message).readBytes(retBytes);
+ ret = retBytes;
+ } else {
+ ret = (byte[])((ObjectMessage)message).getObject();
+ }
+
+ return ret;
+ }
+
+ public static JMSMessageHeadersType populateIncomingContext(javax.jms.Message message,
+ org.apache.cxf.message.Message inMessage,
+ String headerType) throws JMSException {
+ JMSMessageHeadersType headers = null;
+
+ headers = (JMSMessageHeadersType)inMessage.get(headerType);
+
+ if (headers == null) {
+ headers = new JMSMessageHeadersType();
+ inMessage.put(headerType, headers);
+ }
+
+ headers.setJMSCorrelationID(message.getJMSCorrelationID());
+ headers.setJMSDeliveryMode(new Integer(message.getJMSDeliveryMode()));
+ headers.setJMSExpiration(new Long(message.getJMSExpiration()));
+ headers.setJMSMessageID(message.getJMSMessageID());
+ headers.setJMSPriority(new Integer(message.getJMSPriority()));
+ headers.setJMSRedelivered(Boolean.valueOf(message.getJMSRedelivered()));
+ headers.setJMSTimeStamp(new Long(message.getJMSTimestamp()));
+ headers.setJMSType(message.getJMSType());
+
+ Map<String, List<String>> protHeaders = new HashMap<String, List<String>>();
+ List<JMSPropertyType> props = headers.getProperty();
+ Enumeration enm = message.getPropertyNames();
+ while (enm.hasMoreElements()) {
+ String name = (String)enm.nextElement();
+ String val = message.getStringProperty(name);
+ JMSPropertyType prop = new JMSPropertyType();
+ prop.setName(name);
+ prop.setValue(val);
+ props.add(prop);
+
+ protHeaders.put(name, Collections.singletonList(val));
+ if (name.equals(org.apache.cxf.message.Message.CONTENT_TYPE) && val != null) {
+ inMessage.put(org.apache.cxf.message.Message.CONTENT_TYPE, val);
+ }
+
+ }
+ inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders);
+ return headers;
+ }
+
+ protected static void addProtocolHeaders(Message message, Map<String, List<String>> headers)
+ throws JMSException {
+ if (headers == null) {
+ return;
+ }
+ StringBuilder value = new StringBuilder(256);
+ for (Map.Entry<String, List<String>> entry : headers.entrySet()) {
+ value.setLength(0);
+ boolean first = true;
+ for (String s : entry.getValue()) {
+ if (!first) {
+ value.append("; ");
+ }
+ value.append(s);
+ first = false;
+ }
+ message.setStringProperty(entry.getKey(), value.toString());
+ }
+ }
+
+ public static Map<String, List<String>> getSetProtocolHeaders(org.apache.cxf.message.Message message) {
+ Map<String, List<String>> headers = CastUtils.cast((Map<?, ?>)message
+ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS));
+ if (null == headers) {
+ headers = new HashMap<String, List<String>>();
+ message.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers);
+ }
+ return headers;
+ }
+
+ public static void setContentToProtocalHeader(org.apache.cxf.message.Message message) {
+ String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE);
+
+ Map<String, List<String>> headers = JMSUtils.getSetProtocolHeaders(message);
+ if (headers.get(org.apache.cxf.message.Message.CONTENT_TYPE) == null) {
+ List<String> ct = new ArrayList<String>();
+ ct.add(contentType);
+ headers.put(org.apache.cxf.message.Message.CONTENT_TYPE, ct);
+ } else {
+ List<String> ct = headers.get(org.apache.cxf.message.Message.CONTENT_TYPE);
+ ct.add(contentType);
+ }
+ }
+
+ public static boolean isDestinationStyleQueue(AddressType address) {
+ return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value());
+ }
}
Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=690638&r1=690637&r2=690638&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Sat Aug 30 21:49:48 2008
@@ -140,9 +140,9 @@
JMSConduit conduit = setupJMSConduit(true, false);
Message msg = new MessageImpl();
conduit.prepare(msg);
- PooledSession sess = conduit.base.sessionFactory.get(true);
+ PooledSession sess = conduit.sessionFactory.get(true);
byte [] b = testMsg.getBytes();
- javax.jms.Message message = conduit.base.marshal(b,
+ javax.jms.Message message = JMSUtils.marshal(b,
sess.session(),
null, JMSConstants.BYTE_MESSAGE_TYPE);