You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/08/22 01:16:45 UTC
svn commit: r568321 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
client/Session.java client/impl/ClientSession.java
client/impl/ClientSessionDelegate.java jms/MessageConsumerImpl.java
jms/QpidMessageListener.java
Author: rajith
Date: Tue Aug 21 16:16:44 2007
New Revision: 568321
URL: http://svn.apache.org/viewvc?rev=568321&view=rev
Log:
I added support in the JMS layer to figure out if it received any messages after calling flush
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java Tue Aug 21 16:16:44 2007
@@ -233,7 +233,7 @@
* published them.
* <li>{@link Option#EXCLUSIVE}: <p> Request exclusive subscription access, meaning only this
* ubscription can access the queue.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
*
* @param queue The queue this receiver is receiving messages from.
@@ -423,16 +423,6 @@
*/
public void messageRelease(RangeSet ranges);
-
- /**
- * Returns the number of message received for this session since
- * {@link Session#messageFlow} has bee invoked.
- *
- * @return The number of message received for this session since
- * {@link Session#messageFlow} has bee invoked.
- */
- public int messagesReceived();
-
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
@@ -478,7 +468,7 @@
* declaring connection closes.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
* This field allows the client to assert the presence of a queue without modifying the server state.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
@@ -540,7 +530,7 @@
* <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
* If the queue has consumers the server does does not delete it but raises a channel exception instead.
- * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* </p>
* <p/>
@@ -569,7 +559,7 @@
* exchanges) are purged if/when a server restarts.
* <li> {@link Option#PASSIVE}: <p> If set, the server will not create the exchange.
* The client can use this to check whether an exchange exists without modifying the server state.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option</p>
*
@@ -596,7 +586,7 @@
* <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
* exchange has queue bindings the server does not delete it but raises a channel exception
* instead.
- * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
* </ul>
* <p>In the absence of a particular option, the defaul value is false for each option
*
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java Tue Aug 21 16:16:44 2007
@@ -22,19 +22,7 @@
private ExceptionListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
-
- public int messagesReceived()
- {
- // TODO
- return 1;
- }
-
- @Override public void sessionClose()
- {
- super.sessionClose();
- }
-
+
public void messageAcknowledge(RangeSet ranges)
{
for (Range range : ranges)
@@ -97,6 +85,10 @@
public void setMessageListener(String destination, MessagePartListener listener)
{
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("Cannot set message listener to null");
+ }
_messageListeners.put(destination, listener);
}
@@ -104,8 +96,6 @@
{
_exceptionListner = exceptionListner;
}
-
- // ugly but nessacery
void setAccquiredMessages(RangeSet acquiredMessages)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java Tue Aug 21 16:16:44 2007
@@ -9,7 +9,6 @@
import org.apache.qpidity.MessageTransfer;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Range;
-import org.apache.qpidity.RangeSet;
import org.apache.qpidity.Session;
import org.apache.qpidity.SessionClosed;
import org.apache.qpidity.SessionDelegate;
@@ -54,16 +53,7 @@
_currentTransfer = currentTransfer;
_currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
_currentMessageListener.messageTransfer(currentTransfer.getId());
-
- //a better way is to tell the broker to stop the transfer
- if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
- {
- RangeSet transfers = new RangeSet();
- transfers.add(_currentTransfer.getId());
- session.messageRelease(transfers);
- }
}
-
@Override public void messageReject(Session session, MessageReject struct)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Tue Aug 21 16:16:44 2007
@@ -17,6 +17,8 @@
*/
package org.apache.qpidity.jms;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.qpidity.jms.message.QpidMessage;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
@@ -94,6 +96,8 @@
* Nether exceed MAX_MESSAGE_TRANSFERRED
*/
private int _messageAsyncrhonouslyReceived = 0;
+
+ private AtomicBoolean _messageReceived = new AtomicBoolean();
//----- Constructors
/**
@@ -354,7 +358,6 @@
// This indicate to the delivery thread to deliver the message to this consumer
// as it can happens that a message is delivered after a receive operation as returned.
_isReceiving = true;
- int received = 0;
if (!_isStopped)
{
// if this consumer is stopped then this will be call when starting
@@ -362,10 +365,13 @@
.messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
1);
getSession().getQpidSession().messageFlush(getMessageActorID());
+ _messageReceived.set(false);
+
+ //When sync() returns we know whether we have received a message or not.
getSession().getQpidSession().sync();
- received = getSession().getQpidSession().messagesReceived();
+ //received = getSession().getQpidSession().messagesReceived();
}
- if (received == 0 && timeout < 0)
+ if (_messageReceived.get() && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
result = null;
@@ -501,9 +507,12 @@
.messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
- getSession().getQpidSession().sync();
- int received = getSession().getQpidSession().messagesReceived();
- if (received == 0 && _isNoWaitIsReceiving)
+ _messageReceived.set(false);
+
+ // When sync() returns we know whether we have received a message or not.
+ getSession().getQpidSession().sync();
+
+ if (_messageReceived.get() && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a message
// but no one can be delivered it then need to return
@@ -631,5 +640,10 @@
getSession().getQpidSession().messageAcknowledge(ranges);
getSession().testQpidException();
}
+ }
+
+ public void notifyMessageReceived()
+ {
+ _messageReceived.set(true);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=568321&r1=568320&r2=568321&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java Tue Aug 21 16:16:44 2007
@@ -62,6 +62,9 @@
{
try
{
+ // to be used with flush
+ _consumer.notifyMessageReceived();
+
//convert this message into a JMS one
QpidMessage jmsMessage = MessageFactory.getQpidMessage(message);
// if consumer is asynchronous then send this message to its session.