You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/20 14:50:20 UTC
svn commit: r567674 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
client/Session.java client/impl/ClientSession.java
jms/MessageConsumerImpl.java jms/SessionImpl.java
jms/message/QpidMessage.java
Author: arnaudsimon
Date: Mon Aug 20 05:50:19 2007
New Revision: 567674
URL: http://svn.apache.org/viewvc?rev=567674&view=rev
Log:
added sync
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java Mon Aug 20 05:50:19 2007
@@ -326,8 +326,7 @@
/**
* Forces the broker to exhaust its credit supply.
* <p> The broker's credit will always be zero when
- * this method completes. This method does not complete until all the message transfers occur.
- * <p> This method returns the number of flushed messages.
+ * this method completes.
*
* @param destination The destination to call flush on.
*/
@@ -424,6 +423,16 @@
*/
public void messageRelease(RangeSet ranges);
+
+ /**
+ * Returns the number of message received for this session since
+ * {@link Session#messageFlow} has bee invoked.
+ *
+ * @return The number of message received for this session since
+ * {@link Session#messageFlow} has bee invoked.
+ */
+ public int messagesReceived();
+
// -----------------------------------------------
// Local transaction methods
// ----------------------------------------------
@@ -568,7 +577,7 @@
* @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
* exchange types define the functionality of the exchange - i.e. how messages are routed
* through it. It is not valid or meaningful to attempt to change the type of an existing
- * exchange. Default exchange types are: direct, topic, headers and fanout.
+ * exchange. Default exchange types are: direct, topic, headers and fanout.
* @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
* the message will be sent.
* @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java Mon Aug 20 05:50:19 2007
@@ -25,7 +25,14 @@
private ExceptionListener _exceptionListner;
private RangeSet _acquiredMessages;
private RangeSet _rejectedMessages;
-
+
+
+ public int messagesReceived()
+ {
+ // TODO
+ return 1;
+ }
+
@Override public void sessionClose()
{
super.sessionClose();
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java Mon Aug 20 05:50:19 2007
@@ -130,7 +130,8 @@
{
// this is a queue we expect that this queue exists
getSession().getQpidSession()
- .messageSubscribe(destination.getQpidQueueName(), getMessageActorID(),
+ .messageSubscribe(destination.getQpidQueueName(), // queue
+ getMessageActorID(), // destination
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// When the message selctor is set we do not acquire the messages
_messageSelector != null ? org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE : org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
@@ -156,8 +157,7 @@
else
{
// this is a non durable subscriber
- // create a temporary queue
- queueName = "topic-" + getMessageActorID();
+ queueName = destination.getQpidQueueName();
getSession().getQpidSession()
.queueDeclare(queueName, null, null, Option.AUTO_DELETE, Option.EXCLUSIVE);
}
@@ -169,8 +169,8 @@
.messageSubscribe(queueName, getMessageActorID(),
org.apache.qpidity.client.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
// We always acquire the messages
- org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, messageAssembler, null,
- _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
+ org.apache.qpidity.client.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ messageAssembler, null, _noLocal ? Option.NO_LOCAL : Option.NO_OPTION,
// Request exclusive subscription access, meaning only this subscription
// can access the queue.
Option.EXCLUSIVE);
@@ -179,6 +179,12 @@
// set the flow mode
getSession().getQpidSession()
.messageFlowMode(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_MODE_CREDIT);
+ getSession().getQpidSession().sync();
+ // check for an exception
+ if (getSession().getCurrentException() != null)
+ {
+ throw getSession().getCurrentException();
+ }
}
//----- Message consumer API
@@ -353,11 +359,13 @@
{
// if this consumer is stopped then this will be call when starting
getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
- // received = getSession().getQpidSession().
+ getSession().getQpidSession().sync();
+ received = getSession().getQpidSession().messagesReceived();
}
- if ( received == 0 && timeout < 0)
+ if (received == 0 && timeout < 0)
{
// this is a nowait and we havent received a message then we must immediatly return
result = null;
@@ -425,7 +433,8 @@
// there is a synch call waiting for a message to be delivered
// so tell the broker to deliver a message
getSession().getQpidSession()
- .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
+ .messageFlow(getMessageActorID(), org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+ 1);
getSession().getQpidSession().messageFlush(getMessageActorID());
}
}
@@ -490,8 +499,10 @@
getSession().getQpidSession()
.messageFlow(getMessageActorID(),
org.apache.qpidity.client.Session.MESSAGE_FLOW_UNIT_MESSAGE, 1);
- int received = 0; //getSession().getQpidSession().messageFlush(getMessageActorID());
- if ( received == 0 && _isNoWaitIsReceiving)
+ getSession().getQpidSession().messageFlush(getMessageActorID());
+ getSession().getQpidSession().sync();
+ int received = getSession().getQpidSession().messagesReceived();
+ if (received == 0 && _isNoWaitIsReceiving)
{
// Right a message nowait is waiting for a message
// but no one can be delivered it then need to return
@@ -570,9 +581,10 @@
if (_preAcquire)
{
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- //ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getSession().getQpidSession().messageRelease(ranges);
+ getSession().getQpidSession().sync();
+ testQpidException();
}
}
@@ -589,15 +601,17 @@
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
- getSession().getQpidSession().messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ getSession().getQpidSession()
+ .messageAcquire(ranges, org.apache.qpidity.client.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);
+ getSession().getQpidSession().sync();
RangeSet acquired = getSession().getQpidSession().getAccquiredMessages();
if (acquired.size() > 0)
{
- result = true; // todo acquired.iterator().next().getLower() == message.getMessageID();
+ result = true;
}
+ testQpidException();
}
return result;
}
@@ -613,9 +627,19 @@
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getSession().getQpidSession().messageAcknowledge(ranges);
+ getSession().getQpidSession().sync();
+ testQpidException();
+ }
+ }
+
+ private void testQpidException() throws QpidException
+ {
+ QpidException qe = getSession().getCurrentException();
+ if (qe != null)
+ {
+ throw qe;
}
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java Mon Aug 20 05:50:19 2007
@@ -25,6 +25,8 @@
import javax.jms.*;
import javax.jms.IllegalStateException;
+import javax.jms.MessageListener;
+import javax.jms.Session;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.HashMap;
@@ -108,6 +110,11 @@
private org.apache.qpidity.client.Session _qpidSession;
/**
+ * The latest qpid Exception that has been reaised.
+ */
+ private QpidException _currentException;
+
+ /**
* Indicates whether this session is recovering
*/
private boolean _inRecovery = false;
@@ -142,6 +149,8 @@
// create the qpid session with an expiry <= 0 so that the session does not expire
_qpidSession = _connection.getQpidConnection().createSession(0);
+ // set the exception listnere for this session
+ _qpidSession.setExceptionListener(new QpidSessionExceptionListener());
// set transacted if required
if (_transacted && !isXA)
{
@@ -431,8 +440,7 @@
{
// release this message
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getQpidSession().messageRelease(ranges);
}
}
@@ -817,6 +825,17 @@
checkNotClosed();
}
+ /**
+ * Get the latest thrown exception.
+ *
+ * @return The latest thrown exception.
+ */
+ public synchronized QpidException getCurrentException()
+ {
+ QpidException result = _currentException;
+ _currentException = null;
+ return result;
+ }
//----- Protected methods
/**
@@ -1006,8 +1025,7 @@
{
// acknowledge this message
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getQpidSession().messageAcknowledge(ranges);
}
//tobedone: Implement DUPS OK heuristic
@@ -1035,8 +1053,7 @@
{
// acknowledge this message
RangeSet ranges = new RangeSet();
- // TODO: messageID is a string but range need a long???
- // ranges.add(message.getMessageID());
+ ranges.add(message.getMessageTransferId());
getQpidSession().messageAcknowledge(ranges);
}
//empty the list of unack messages
@@ -1092,6 +1109,22 @@
}
//------ Inner classes
+
+ /**
+ * Lstener for qpid protocol exceptions
+ */
+ private class QpidSessionExceptionListener implements org.apache.qpidity.client.ExceptionListener
+ {
+ public void onException(QpidException exception)
+ {
+ synchronized (this)
+ {
+ //todo check the error code for finding out if we need to notify the
+ // JMS connection exception listener
+ _currentException = exception;
+ }
+ }
+ }
/**
* Convenient class for storing incoming messages associated with a consumer ID.
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java?rev=567674&r1=567673&r2=567674&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/message/QpidMessage.java Mon Aug 20 05:50:19 2007
@@ -369,6 +369,7 @@
/**
* Get this message excahgne name
+ *
* @return this message excahgne name
*/
public String getExchangeName()
@@ -406,6 +407,16 @@
public org.apache.qpidity.api.Message getQpidityMessage()
{
return _qpidityMessage;
+ }
+
+ /**
+ * Get this message transfer ID.
+ *
+ * @return This message transfer ID.
+ */
+ public long getMessageTransferId()
+ {
+ return _qpidityMessage.getMessageTransferId();
}
}