You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/11/12 17:30:25 UTC
svn commit: r594204 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
nclient/JMSTestCase.java nclient/impl/ClientSession.java
njms/XAResourceImpl.java
Author: rajith
Date: Mon Nov 12 08:30:24 2007
New Revision: 594204
URL: http://svn.apache.org/viewvc?rev=594204&view=rev
Log:
Chnaged the sync logic in the client session to ignore message headers
Fixed a compilation error in XAResourceImpl
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java?rev=594204&r1=594203&r2=594204&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java Mon Nov 12 08:30:24 2007
@@ -1,6 +1,7 @@
-package org.apache.qpidity.nclient;
+ package org.apache.qpidity.nclient;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
@@ -17,27 +18,14 @@
javax.jms.Session ssn = con.createSession(false, 1);
- javax.jms.Destination dest = new AMQTopic(new AMQShortString("amq.topic"),"myTopic");
- javax.jms.MessageProducer prod = ssn.createProducer(dest);
- javax.jms.MessageConsumer cons = ssn.createConsumer(dest,"targetMessage = TRUE");
+ javax.jms.Destination dest = new AMQQueue(new AMQShortString("direct"),"test");
+ javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
- javax.jms.TextMessage msg = ssn.createTextMessage();
- msg.setText("This is a test message");
- msg.setBooleanProperty("targetMessage", false);
- prod.send(msg);
+ javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receive();
- msg.setBooleanProperty("targetMessage", true);
- prod.send(msg);
-
- javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait();
-
- if (m == null)
- {
- System.out.println("message is null");
- }
- else
+ if (m != null)
{
- System.out.println("message is not null");
+ System.out.println("Message" + m);
}
}
catch(Exception e)
@@ -45,4 +33,23 @@
e.printStackTrace();
}
}
+
+ /* javax.jms.TextMessage msg = ssn.createTextMessage();
+ msg.setText("This is a test message");
+ msg.setBooleanProperty("targetMessage", false);
+ prod.send(msg);
+
+ msg.setBooleanProperty("targetMessage", true);
+ prod.send(msg);
+
+ javax.jms.TextMessage m = (javax.jms.TextMessage)cons.receiveNoWait();
+
+ if (m == null)
+ {
+ System.out.println("message is null");
+ }
+ else
+ {
+ System.out.println("message is not null" + m);
+ }*/
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=594204&r1=594203&r2=594204&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Mon Nov 12 08:30:24 2007
@@ -2,28 +2,27 @@
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
-import java.nio.ByteBuffer;
-import org.apache.qpidity.transport.Option;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.transport.Range;
-import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.ClosedListener;
import org.apache.qpidity.nclient.MessagePartListener;
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
/**
- * Implements a Qpid Sesion.
+ * Implements a Qpid Sesion.
*/
public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession
{
static
{
MAX_NOT_SYNC_DATA_LENGH = 200000 * 1024;
- String max = "message_size_before_sync";
+ String max = "message_size_before_sync";
if (System.getProperties().containsKey(max))
{
try
@@ -37,12 +36,12 @@
}
}
- private static long MAX_NOT_SYNC_DATA_LENGH ;
+ private static long MAX_NOT_SYNC_DATA_LENGH;
private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
private ClosedListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
- private long _dataSentNotSync;
+ private long _currentDataSizeNotSynced;
public void messageAcknowledge(RangeSet ranges)
@@ -65,21 +64,38 @@
// The javadoc clearly says that this method is suitable for small messages
// therefore reading the content in one shot.
ByteBuffer data = msg.readData();
- _dataSentNotSync = _dataSentNotSync + msg.getMessageProperties().getContentLength() + data.limit();
super.messageTransfer(destination, confirmMode, acquireMode);
super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
super.data( data );
super.endData();
- if( _dataSentNotSync >= MAX_NOT_SYNC_DATA_LENGH)
- {
- sync();
- }
}
public void sync()
{
- _dataSentNotSync = 0;
super.sync();
+ _currentDataSizeNotSynced = 0;
+ }
+
+ /* -------------------------
+ * Data methods
+ * ------------------------*/
+
+ public void data(ByteBuffer buf)
+ {
+ _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
+ super.data(buf);
+ }
+
+ public void data(String str)
+ {
+ _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length;
+ super.data(str);
+ }
+
+ public void data(byte[] bytes)
+ {
+ _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length;
+ super.data(bytes);
}
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
@@ -89,7 +105,7 @@
boolean b = true;
int count = 0;
while(b)
- {
+ {
try
{
System.out.println("count : " + count++);
@@ -99,11 +115,20 @@
{
b = false;
}
- }
-
+ }
+
super.endData();
}
-
+
+ public void endData()
+ {
+ super.endData();
+ if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
+ {
+ sync();
+ }
+ }
+
public RangeSet getAccquiredMessages()
{
return _acquiredMessages;
@@ -113,36 +138,36 @@
{
return _rejectedMessages;
}
-
+
public void setMessageListener(String destination, MessagePartListener listener)
{
if (listener == null)
{
throw new IllegalArgumentException("Cannot set message listener to null");
}
- _messageListeners.put(destination, listener);
+ _messageListeners.put(destination, listener);
}
-
+
public void setClosedListener(ClosedListener exceptionListner)
{
- _exceptionListner = exceptionListner;
- }
-
+ _exceptionListner = exceptionListner;
+ }
+
void setAccquiredMessages(RangeSet acquiredMessages)
{
_acquiredMessages = acquiredMessages;
}
-
+
void setRejectedMessages(RangeSet rejectedMessages)
{
_rejectedMessages = rejectedMessages;
}
-
+
void notifyException(QpidException ex)
{
_exceptionListner.onClosed(null, null);
}
-
+
Map<String,MessagePartListener> getMessageListerners()
{
return _messageListeners;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java?rev=594204&r1=594203&r2=594204&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java Mon Nov 12 08:30:24 2007
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -327,15 +327,15 @@
*/
public Xid[] recover(int flag) throws XAException
{
- // the flag is ignored
+ // the flag is ignored
Future<DtxCoordinationRecoverResult> future = _xaSession.getQpidSession().dtxCoordinationRecover();
DtxCoordinationRecoverResult res = future.get();
// todo make sure that the keys of the returned map are the xids
Xid[] result = new Xid[res.getInDoubt().size()];
int i = 0;
- try
+ /* try
{
- for (Object xid : res.getInDoubt())
+ /* for (Object xid : res.getInDoubt())
{
result[i] = new XidImpl((String) xid);
i++;
@@ -348,7 +348,7 @@
_logger.debug("Cannot convert string into Xid ", e);
}
throw new XAException(XAException.XAER_PROTO);
- }
+ }*/
return result;
}