You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/15 00:08:14 UTC

svn commit: r487383 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: BasicMessageProducer.java message/JMSBytesMessage.java

Author: rgreig
Date: Thu Dec 14 15:08:13 2006
New Revision: 487383

URL: http://svn.apache.org/viewvc?view=rev&rev=487383
Log: (empty)

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=487383&r1=487382&r2=487383
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Dec 14 15:08:13 2006
@@ -24,11 +24,13 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 
 import javax.jms.*;
 import java.io.UnsupportedEncodingException;
+import java.util.Enumeration;
 
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
@@ -229,9 +231,11 @@
     {
     	checkPreConditions();
     	checkInitialDestination();
+
+
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, _immediate);
         }
     }
@@ -240,9 +244,10 @@
     {
     	checkPreConditions();
     	checkInitialDestination();
+
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, _immediate);
         }
     }
@@ -253,7 +258,7 @@
     	checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, immediate);
         }
     }
@@ -265,7 +270,7 @@
     	checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
+            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
                      _immediate);
         }
     }
@@ -277,7 +282,7 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, _immediate);
         }
     }
@@ -291,7 +296,7 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      _mandatory, _immediate);
         }
     }
@@ -305,7 +310,7 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      mandatory, _immediate);
         }
     }
@@ -319,7 +324,7 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      mandatory, immediate);
         }
     }
@@ -334,11 +339,146 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      mandatory, immediate, waitUntilSent);
         }
     }
 
+
+    private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
+    {
+        if(message instanceof AbstractJMSMessage)
+        {
+            return (AbstractJMSMessage) message;
+        }
+        else
+        {
+            AbstractJMSMessage newMessage;
+
+            if(message instanceof BytesMessage)
+            {
+                BytesMessage bytesMessage = (BytesMessage) message;
+                bytesMessage.reset();
+
+                JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
+
+
+
+                byte[] buf = new byte[1024];
+
+                int len;
+
+                while((len = bytesMessage.readBytes(buf)) != -1)
+                {
+                    nativeMsg.writeBytes(buf,0,len);
+                }
+
+                newMessage = nativeMsg;
+            }
+            else if(message instanceof MapMessage)
+            {
+                MapMessage origMessage = (MapMessage) message;
+                MapMessage nativeMessage = _session.createMapMessage();
+
+                Enumeration mapNames = origMessage.getMapNames();
+                while(mapNames.hasMoreElements())
+                {
+                    String name = (String) mapNames.nextElement();
+                    nativeMessage.setObject(name, origMessage.getObject(name));
+                }
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else if(message instanceof ObjectMessage)
+            {
+                ObjectMessage origMessage = (ObjectMessage) message;
+                ObjectMessage nativeMessage = _session.createObjectMessage();
+
+                nativeMessage.setObject(origMessage.getObject());
+                
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else if(message instanceof TextMessage)
+            {
+                TextMessage origMessage = (TextMessage) message;
+                TextMessage nativeMessage = _session.createTextMessage();
+
+                nativeMessage.setText(origMessage.getText());
+
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else if(message instanceof StreamMessage)
+            {
+                StreamMessage origMessage = (StreamMessage) message;
+                StreamMessage nativeMessage = _session.createStreamMessage();
+
+
+                try
+                {
+                    origMessage.reset();
+                    while(true)
+                    {
+                        nativeMessage.writeObject(origMessage.readObject());
+                    }
+                }
+                catch (MessageEOFException e)
+                {
+                    ;//
+                }
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else
+            {
+                newMessage = (AbstractJMSMessage) _session.createMessage();
+
+            }
+
+            Enumeration propertyNames = message.getPropertyNames();
+            while(propertyNames.hasMoreElements())
+            {
+                String propertyName = String.valueOf(propertyNames.nextElement());
+                if(!propertyName.startsWith("JMSX_"))
+                {
+                    Object value = message.getObjectProperty(propertyName);
+                    newMessage.setObjectProperty(propertyName, value);
+                }
+            }
+
+            newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+            
+            int priority = message.getJMSPriority();
+            if(priority < 0)
+            {
+                priority = 0;
+            }
+            else if(priority > 9)
+            {
+                priority = 9;
+            }
+
+            newMessage.setJMSPriority(priority);
+            if(message.getJMSReplyTo() != null)
+            {
+                newMessage.setJMSReplyTo(message.getJMSReplyTo());
+            }
+            newMessage.setJMSType(message.getJMSType());
+
+
+            
+
+            if(newMessage != null)
+            {
+                return newMessage;
+            }
+            else
+            {
+                throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+            }
+        }
+    }
+
+
+
     private void validateDestination(Destination destination) throws JMSException
     {
         if (!(destination instanceof AMQDestination))
@@ -349,7 +489,7 @@
         declareDestination((AMQDestination)destination);
     }
 
-    protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
                             long timeToLive, boolean mandatory, boolean immediate) throws JMSException
     {
         sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
@@ -358,7 +498,7 @@
     /**
      * The caller of this method must hold the failover mutex.
      * @param destination
-     * @param message
+     * @param origMessage
      * @param deliveryMode
      * @param priority
      * @param timeToLive
@@ -366,9 +506,11 @@
      * @param immediate
      * @throws JMSException
      */
-    protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+    protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
                             long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
+
+        AbstractJMSMessage message = convertToNativeMessage(origMessage);
         AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
                                                                 destination.getRoutingKey(), mandatory, immediate);
 
@@ -424,6 +566,17 @@
         frames[1] = contentHeaderFrame;
         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
         _protocolHandler.writeFrame(compositeFrame, wait);
+
+
+        if(message != origMessage)
+        {
+            _logger.warn("Updating original message");
+            origMessage.setJMSPriority(message.getJMSPriority());
+            origMessage.setJMSTimestamp(message.getJMSTimestamp());
+            _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+            origMessage.setJMSExpiration(message.getJMSExpiration());
+            origMessage.setJMSMessageID(message.getJMSMessageID());
+        }
     }
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=487383&r1=487382&r2=487383
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Dec 14 15:08:13 2006
@@ -30,6 +30,9 @@
 import javax.jms.MessageEOFException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CharsetDecoder;
+import java.nio.CharBuffer;
 
 public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
 {
@@ -149,10 +152,27 @@
         checkReadable();
         // we check only for one byte since theoretically the string could be only a
         // single byte when using UTF-8 encoding
-        checkAvailable(1);
+
         try
         {
-            return _data.getString(Charset.forName("UTF-8").newDecoder());
+            short length = readShort();
+            if(length == 0)
+            {
+                return "";
+            }
+            else
+            {
+                CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+                ByteBuffer encodedString = _data.slice();
+                encodedString.limit(length);
+                _data.position(_data.position()+length);
+                CharBuffer string = decoder.decode(encodedString.buf());
+                
+                return string.toString();
+            }
+
+
+            
         }
         catch (CharacterCodingException e)
         {
@@ -257,9 +277,15 @@
         checkWritable();
         try
         {
-            _data.putString(string, Charset.forName("UTF-8").newEncoder());
+            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+            java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+            
+            _data.putShort((short)encodedString.limit());
+            _data.put(encodedString);
+
+            //_data.putString(string, Charset.forName("UTF-8").newEncoder());
             // we must add the null terminator manually
-            _data.put((byte)0);
+            //_data.put((byte)0);
         }
         catch (CharacterCodingException e)
         {