You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/26 18:08:40 UTC
svn commit: r532788 - in /incubator/qpid/trunk/qpid: ./
java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: ritchiem
Date: Thu Apr 26 09:08:39 2007
New Revision: 532788
URL: http://svn.apache.org/viewvc?view=rev&rev=532788
Log:
Merged revisions 532786 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r532786 | ritchiem | 2007-04-26 16:59:24 +0100 (Thu, 26 Apr 2007) | 3 lines
QPID-466 Create STRICT_AMQP System property to disable JMS extensions in Java client.
This disables the JMS features that rely upon Qpid Java broker specific features.
........
Modified:
incubator/qpid/trunk/qpid/ (props changed)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=532788&r1=532787&r2=532788
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Apr 26 09:08:39 2007
@@ -202,11 +202,20 @@
/** Boolean to control immediate prefetch . Records the first call to the dispatcher to prevent further flow(true) */
private final AtomicBoolean _firstDispatcher = new AtomicBoolean(true);
+ /** System property to enable strickt AMQP compliance */
+ public static final String STRICT_AMQP = "STRICT_AMQP";
+ /** Strickt AMQP default */
+ public static final String STRICT_AMQP_DEFAULT = "false";
+
+ private final boolean _strictAMQP;
+
+
/** System property to enable immediate message prefetching */
public static final String IMMEDIATE_PREFETCH = "IMMEDIATE_PREFETCH";
/** Immediate message prefetch default */
public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
+ private final boolean _immediatePrefetch;
private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
@@ -435,6 +444,10 @@
AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
+
+ _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT));
+ _immediatePrefetch = Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT));
+
_connection = con;
_transacted = transacted;
if (transacted)
@@ -921,15 +934,27 @@
_dispatcher.rollback();
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- false) // requeue
- , BasicRecoverOkBody.class);
+ if (isStrictAMQP())
+ {
+ // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+ _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ false)); // requeue
+ _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+ }
+ else
+ {
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _connection.getProtocolHandler().syncWrite(BasicRecoverBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ false) // requeue
+ , BasicRecoverOkBody.class);
+ }
if (!isSuspended)
{
suspendChannel(false);
@@ -1433,7 +1458,6 @@
private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
boolean nowait, String messageSelector) throws AMQException
{
- //fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
@@ -1709,11 +1733,21 @@
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
checkNotClosed();
checkValidQueue(queue);
return new AMQQueueBrowser(this, (AMQQueue) queue, messageSelector);
@@ -1762,6 +1796,11 @@
boolean isQueueBound(AMQShortString exchangeName, AMQShortString queueName, AMQShortString routingKey) throws JMSException
{
+ if (isStrictAMQP())
+ {
+ throw new UnsupportedOperationException();
+ }
+
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
@@ -1940,7 +1979,7 @@
synchronized void startDistpatcherIfNecessary()
{
// If IMMEDIATE_PREFETCH is not set then we need to start fetching
- if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
+ if (!_immediatePrefetch)
{
// We do this now if this is the first call on a started connection
if (isSuspended() && _startedAtLeastOnce.get() && _firstDispatcher.getAndSet(false))
@@ -2005,7 +2044,7 @@
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
- if (!Boolean.parseBoolean(System.getProperties().getProperty(IMMEDIATE_PREFETCH, IMMEDIATE_PREFETCH_DEFAULT)))
+ if (!_immediatePrefetch)
{
// The dispatcher will be null if we have just created this session
// so suspend the channel before we register our consumer so that we don't
@@ -2390,6 +2429,11 @@
_connection.getProtocolHandler().writeFrame(basicRejectBody);
}
+ }
+
+ public boolean isStrictAMQP()
+ {
+ return _strictAMQP;
}
}