You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/11/12 17:30:25 UTC

svn commit: r594204 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: nclient/JMSTestCase.java nclient/impl/ClientSession.java njms/XAResourceImpl.java

Author: rajith
Date: Mon Nov 12 08:30:24 2007
New Revision: 594204

URL: http://svn.apache.org/viewvc?rev=594204&view=rev
Log:
Chnaged the sync logic in the client session to ignore message headers
Fixed a compilation error in XAResourceImpl

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java?rev=594204&r1=594203&r2=594204&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java Mon Nov 12 08:30:24 2007
@@ -1,6 +1,7 @@
-package org.apache.qpidity.nclient;
+ package org.apache.qpidity.nclient;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.framing.AMQShortString;
 
@@ -17,27 +18,14 @@
 
             javax.jms.Session ssn = con.createSession(false, 1);
 
-            javax.jms.Destination dest = new AMQTopic(new AMQShortString("amq.topic"),"myTopic");
-            javax.jms.MessageProducer prod = ssn.createProducer(dest);
-            javax.jms.MessageConsumer cons = ssn.createConsumer(dest,"targetMessage = TRUE");
+            javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test");
+            javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
 
-            javax.jms.TextMessage msg = ssn.createTextMessage();
-            msg.setText("This is a test message");
-            msg.setBooleanProperty("targetMessage", false);
-            prod.send(msg);
+            javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive();
 
-            msg.setBooleanProperty("targetMessage", true);
-            prod.send(msg);
-
-            javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait();
-
-            if (m == null)
-            {
-               System.out.println("message is null");
-            }
-            else
+            if (m != null)
             {
-               System.out.println("message is not null");
+               System.out.println("Message"  + m);
             }
         }
         catch(Exception e)
@@ -45,4 +33,23 @@
             e.printStackTrace();
         }
     }
+
+    /* javax.jms.TextMessage msg = ssn.createTextMessage();
+    msg.setText("This is a test message");
+    msg.setBooleanProperty("targetMessage", false);
+    prod.send(msg);
+
+    msg.setBooleanProperty("targetMessage", true);
+    prod.send(msg);
+
+    javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait();
+
+    if (m == null)
+    {
+       System.out.println("message is null");
+    }
+    else
+    {
+       System.out.println("message is not null"  + m);
+    }*/
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=594204&r1=594203&r2=594204&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Mon Nov 12 08:30:24 2007
@@ -2,28 +2,27 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
-import java.nio.ByteBuffer;
 
-import org.apache.qpidity.transport.Option;
 import org.apache.qpidity.QpidException;
-import org.apache.qpidity.transport.Range;
-import org.apache.qpidity.transport.RangeSet;
 import org.apache.qpidity.api.Message;
 import org.apache.qpidity.nclient.ClosedListener;
 import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
 
 /**
- * Implements a Qpid Sesion. 
+ * Implements a Qpid Sesion.
  */
 public class ClientSession extends org.apache.qpidity.transport.Session implements  org.apache.qpidity.nclient.DtxSession
 {
     static
     {
         MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024;
-        String max = "message_size_before_sync";       
+        String max = "message_size_before_sync";
         if (System.getProperties().containsKey(max))
         {
             try
@@ -37,12 +36,12 @@
         }
     }
 
-    private static  long MAX_NOT_SYNC_DATA_LENGH  ;
+    private static  long MAX_NOT_SYNC_DATA_LENGH;
     private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
     private ClosedListener _exceptionListner;
     private RangeSet _acquiredMessages;
     private RangeSet _rejectedMessages;
-    private long _dataSentNotSync;
+    private long _currentDataSizeNotSynced;
 
 
     public void messageAcknowledge(RangeSet ranges)
@@ -65,21 +64,38 @@
         // The javadoc clearly says that this method is suitable for small messages
         // therefore reading the content in one shot.
         ByteBuffer  data = msg.readData();
-        _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit();
         super.messageTransfer(destination, confirmMode, acquireMode);
         super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
         super.data( data );
         super.endData();
-        if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH)
-        {
-            sync();
-        }
     }
 
     public void sync()
     {
-        _dataSentNotSync = 0;
         super.sync();
+        _currentDataSizeNotSynced = 0;
+    }
+
+    /* -------------------------
+     * Data methods
+     * ------------------------*/
+
+    public void data(ByteBuffer buf)
+    {
+        _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
+        super.data(buf);
+    }
+
+    public void data(String str)
+    {
+        _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length;
+        super.data(str);
+    }
+
+    public void data(byte[] bytes)
+    {
+        _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length;
+        super.data(bytes);
     }
 
     public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
@@ -89,7 +105,7 @@
         boolean b = true;
         int count = 0;
         while(b)
-        {   
+        {
             try
             {
                 System.out.println("count : " + count++);
@@ -99,11 +115,20 @@
             {
                 b = false;
             }
-        }   
-        
+        }
+
         super.endData();
     }
-    
+
+    public void endData()
+    {
+        super.endData();
+        if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
+        {
+            sync();
+        }
+    }
+
     public RangeSet getAccquiredMessages()
     {
         return _acquiredMessages;
@@ -113,36 +138,36 @@
     {
         return _rejectedMessages;
     }
-    
+
     public void setMessageListener(String destination, MessagePartListener listener)
     {
         if (listener == null)
         {
             throw new IllegalArgumentException("Cannot set message listener to null");
         }
-        _messageListeners.put(destination, listener);       
+        _messageListeners.put(destination, listener);
     }
-    
+
     public void setClosedListener(ClosedListener exceptionListner)
     {
-        _exceptionListner = exceptionListner;        
-    }   
-    
+        _exceptionListner = exceptionListner;
+    }
+
     void setAccquiredMessages(RangeSet acquiredMessages)
     {
         _acquiredMessages = acquiredMessages;
     }
-    
+
     void setRejectedMessages(RangeSet rejectedMessages)
     {
         _rejectedMessages = rejectedMessages;
     }
-    
+
     void notifyException(QpidException ex)
     {
         _exceptionListner.onClosed(null, null);
     }
-    
+
     Map<String,MessagePartListener> getMessageListerners()
     {
         return _messageListeners;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java?rev=594204&r1=594203&r2=594204&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java Mon Nov 12 08:30:24 2007
@@ -5,9 +5,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -327,15 +327,15 @@
      */
     public Xid[] recover(int flag) throws XAException
     {
-        // the flag is ignored 
+        // the flag is ignored
         Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover();
         DtxCoordinationRecoverResult res = future.get();
         // todo make sure that the keys of the returned map are the xids
         Xid[] result = new Xid[res.getInDoubt().size()];
         int i = 0;
-        try
+      /*  try
         {
-            for (Object xid : res.getInDoubt())
+          /*  for (Object xid : res.getInDoubt())
             {
                 result[i] = new XidImpl((String) xid);
                 i++;
@@ -348,7 +348,7 @@
                 _logger.debug("Cannot convert string into Xid ", e);
             }
             throw new XAException(XAException.XAER_PROTO);
-        }
+        }*/
         return result;
     }