You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2006/12/20 14:22:28 UTC

svn commit: r489082 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: ./ message/

Author: ritchiem
Date: Wed Dec 20 05:22:27 2006
New Revision: 489082

URL: http://svn.apache.org/viewvc?view=rev&rev=489082
Log:
QPID-233

Applied patch from Rupert Smith

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=489082&r1=489081&r2=489082
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Dec 20 05:22:27 2006
@@ -24,6 +24,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSStreamMessage;
@@ -31,7 +32,6 @@
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -72,15 +72,15 @@
     private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
     /**
-     *  Used to reference durable subscribers so they requests for unsubscribe can be handled
-     *  correctly.  Note this only keeps a record of subscriptions which have been created
-     *  in the current instance.  It does not remember subscriptions between executions of the
-     *  client
+     * Used to reference durable subscribers so they requests for unsubscribe can be handled
+     * correctly.  Note this only keeps a record of subscriptions which have been created
+     * in the current instance.  It does not remember subscriptions between executions of the
+     * client
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
     private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-                new ConcurrentHashMap<BasicMessageConsumer, String>();
+            new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
      * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -319,7 +319,7 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -335,7 +335,7 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -351,7 +351,7 @@
 
     public javax.jms.Message createMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -367,7 +367,7 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -383,7 +383,7 @@
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -401,7 +401,7 @@
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -418,7 +418,7 @@
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -435,7 +435,7 @@
 
     public TextMessage createTextMessage(String text) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -505,7 +505,7 @@
     {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             //Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
@@ -570,7 +570,7 @@
      */
     public void closed(Throwable e)
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will be passed in when closure occurs as a
             // result of a channel close request
@@ -722,11 +722,11 @@
 
     public void acknowledge() throws JMSException
     {
-        if(isClosed())
+        if (isClosed())
         {
             throw new IllegalStateException("Session is already closed");
         }
-        for(BasicMessageConsumer consumer : _consumers.values())
+        for (BasicMessageConsumer consumer : _consumers.values())
         {
             consumer.acknowledge();
         }
@@ -1078,10 +1078,9 @@
         String tag = Integer.toString(_nextTag++);
 
         FieldTable arguments = FieldTableFactory.newFieldTable();
-        if (messageSelector != null)
+        if (messageSelector != null && !messageSelector.equals(""))
         {
-            //fixme move literal value to a common class.
-            arguments.put("x-filter-jms-selector", messageSelector);
+            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
         }
 
         consumer.setConsumerTag(tag);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=489082&r1=489081&r2=489082
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Dec 20 05:22:27 2006
@@ -47,7 +47,8 @@
 
     protected ByteBuffer _data;
     private boolean _readableProperties = false;
-    private boolean _readableMessage = false;
+    protected boolean _readableMessage = false;
+    protected boolean _changedData;
     private Destination _destination;
     private BasicMessageConsumer _consumer;
 
@@ -61,6 +62,7 @@
         }
         _readableProperties = false;
         _readableMessage = (data != null);
+        _changedData = (data == null);
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -522,16 +524,16 @@
         return !_readableMessage;
     }
 
-    public void reset() 
+    public void reset()
     {
-        if (_readableMessage)
+        if (!_changedData)
         {
             _data.rewind();
         }
         else
         {
             _data.flip();
-            _readableMessage = true;
+            _changedData = false;
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=489082&r1=489081&r2=489082
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Wed Dec 20 05:22:27 2006
@@ -59,6 +59,12 @@
         super(messageNbr, contentHeader, data);
     }
 
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
     public String getMimeType()
     {
         return MIME_TYPE;
@@ -226,48 +232,56 @@
     public void writeBoolean(boolean b) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.put(b ? (byte) 1 : (byte) 0);
     }
 
     public void writeByte(byte b) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.put(b);
     }
 
     public void writeShort(short i) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putShort(i);
     }
 
     public void writeChar(char c) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putChar(c);
     }
 
     public void writeInt(int i) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putInt(i);
     }
 
     public void writeLong(long l) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putLong(l);
     }
 
     public void writeFloat(float v) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putFloat(v);
     }
 
     public void writeDouble(double v) throws JMSException
     {
         checkWritable();
+        _changedData = true;
         _data.putDouble(v);
     }
 
@@ -281,7 +295,7 @@
             
             _data.putShort((short)encodedString.limit());
             _data.put(encodedString);
-
+            _changedData = true;
             //_data.putString(string, Charset.forName("UTF-8").newEncoder());
             // we must add the null terminator manually
             //_data.put((byte)0);
@@ -298,12 +312,14 @@
     {
         checkWritable();
         _data.put(bytes);
+        _changedData = true;
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
     {
         checkWritable();
         _data.put(bytes, offset, length);
+        _changedData = true;
     }
 
     public void writeObject(Object object) throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=489082&r1=489081&r2=489082
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Wed Dec 20 05:22:27 2006
@@ -112,7 +112,7 @@
         }
 
     }
-
+  
     public Serializable getObject() throws JMSException
     {
         ObjectInputStream in = null;
@@ -123,18 +123,18 @@
 
         try
         {
-        	_data.rewind();
+            _data.rewind();
             in = new ObjectInputStream(_data.asInputStream());
             return (Serializable) in.readObject();
         }
         catch (IOException e)
-        {           
-           e.printStackTrace();
-           throw new MessageFormatException("Could not deserialize message: " + e);
+        {
+            e.printStackTrace();
+            throw new MessageFormatException("Could not deserialize message: " + e);
         }
         catch (ClassNotFoundException e)
         {
-        	e.printStackTrace();
+            e.printStackTrace();
             throw new MessageFormatException("Could not deserialize message: " + e);
         }
         finally

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=489082&r1=489081&r2=489082
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Wed Dec 20 05:22:27 2006
@@ -86,6 +86,12 @@
         super(messageNbr, contentHeader, data);
     }
 
+    public void reset()
+    {
+        super.reset();
+        _readableMessage = true;
+    }
+
     public String getMimeType()
     {
         return MIME_TYPE;
@@ -103,6 +109,7 @@
     {
         checkWritable();
         _data.put(type);
+        _changedData = true;
     }
 
     public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@
             {
                 _data.putString(string, Charset.forName("UTF-8").newEncoder());
                 // we must write the null terminator ourselves
-                _data.put((byte)0);
+                _data.put((byte) 0);
             }
             catch (CharacterCodingException e)
             {
@@ -706,7 +713,7 @@
 
     public void writeBytes(byte[] bytes) throws JMSException
     {
-        writeBytes(bytes, 0, bytes == null?0:bytes.length);
+        writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
     }
 
     public void writeBytes(byte[] bytes, int offset, int length) throws JMSException

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=489082&r1=489081&r2=489082
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Wed Dec 20 05:22:27 2006
@@ -117,6 +117,7 @@
                 {
                     _data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
                 }
+                _changedData=true;
             }
             _decodedValue = text;
         }