You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/01 11:20:41 UTC
svn commit: r1295495 [2/4] - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java: ./ bdbstore/ bdbstore/bin/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/testclient/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/testclien...
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Thu Mar 1 10:20:36 2012
@@ -65,4 +65,16 @@ public interface AMQConnectionDelegate
ProtocolVersion getProtocolVersion();
boolean verifyClientID() throws JMSException, AMQException;
+
+ /**
+ * Tests whether the server has advertised support for the specified feature
+ * via the qpid.features server connection property. By convention the feature name
+ * with begin <code>qpid.</code> followed by one or more words separated by minus signs
+ * e.g. qpid.jms-selector.
+ *
+ * @param featureName name of feature.
+ *
+ * @return true if the feature is supported by the server
+ */
+ boolean isSupportedServerFeature(final String featureName);
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Mar 1 10:20:36 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.client;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -20,6 +19,7 @@ package org.apache.qpid.client;
*
*/
+package org.apache.qpid.client;
import java.io.IOException;
import java.util.ArrayList;
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
@@ -63,16 +64,12 @@ public class AMQConnectionDelegate_0_10
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
/**
- * The name of the UUID property
- */
- private static final String UUID_NAME = "qpid.federation_tag";
- /**
* The AMQ Connection.
*/
- private AMQConnection _conn;
+ private final AMQConnection _conn;
/**
- * The QpidConeection instance that is mapped with thie JMS connection.
+ * The QpidConeection instance that is mapped with this JMS connection.
*/
org.apache.qpid.transport.Connection _qpidConnection;
private ConnectionException exception = null;
@@ -281,24 +278,29 @@ public class AMQConnectionDelegate_0_10
{
_conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
- try
+ _qpidConnection.notifyFailoverRequired();
+
+ synchronized (_conn.getFailoverMutex())
{
- if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ try
{
- _conn.failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- return;
+ if (_conn.firePreFailover(false) && _conn.attemptReconnection())
+ {
+ _conn.failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ return;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
+ finally
+ {
+ _conn.getProtocolHandler().getFailoverLatch().countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
}
- }
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
}
}
@@ -324,6 +326,18 @@ public class AMQConnectionDelegate_0_10
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
+ if (_conn.isFailingOver())
+ {
+ try
+ {
+ _conn.blockUntilNotFailingOver();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
try
{
return operation.execute();
@@ -352,7 +366,32 @@ public class AMQConnectionDelegate_0_10
public String getUUID()
{
- return (String)_qpidConnection.getServerProperties().get(UUID_NAME);
+ return (String)_qpidConnection.getServerProperties().get(ServerPropertyNames.FEDERATION_TAG);
+ }
+
+ /*
+ * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+ */
+ public boolean isSupportedServerFeature(final String featureName)
+ {
+ if (featureName == null)
+ {
+ throw new IllegalArgumentException("featureName cannot be null");
+ }
+ final Map<String, Object> serverProperties = _qpidConnection.getServerProperties();
+ boolean featureSupported = false;
+ if (serverProperties != null && serverProperties.containsKey(ServerPropertyNames.QPID_FEATURES))
+ {
+ final Object supportServerFeatures = serverProperties.get(ServerPropertyNames.QPID_FEATURES);
+ featureSupported = supportServerFeatures instanceof List && ((List<String>)supportServerFeatures).contains(featureName);
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Server support for feature '" + featureName + "' : " + featureSupported);
+ }
+
+ return featureSupported;
}
private ConnectionSettings retriveConnectionSettings(BrokerDetails brokerDetail)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Thu Mar 1 10:20:36 2012
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.UnresolvedAddressException;
import java.security.GeneralSecurityException;
-import java.security.Security;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.EnumSet;
@@ -44,6 +43,7 @@ import org.apache.qpid.client.protocol.A
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateWaiter;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.ChannelOpenBody;
@@ -66,7 +66,7 @@ import org.slf4j.LoggerFactory;
public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
- private AMQConnection _conn;
+ private final AMQConnection _conn;
public void closeConnection(long timeout) throws JMSException, AMQException
@@ -379,4 +379,14 @@ public class AMQConnectionDelegate_8_0 i
{
return true;
}
+
+ /*
+ * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
+ */
+ public boolean isSupportedServerFeature(String featureName)
+ {
+ // The Qpid Java Broker 0-8..0-9-1 does not advertise features by the qpid.features property, so for now
+ // we just hardcode JMS selectors as supported.
+ return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java Thu Mar 1 10:20:36 2012
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.net.URISyntaxException;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination;
import javax.naming.NamingException;
@@ -59,7 +60,7 @@ public abstract class AMQDestination imp
private boolean _browseOnly;
- private boolean _isAddressResolved;
+ private AtomicLong _addressResolved = new AtomicLong(0);
private AMQShortString _queueName;
@@ -77,7 +78,7 @@ public abstract class AMQDestination imp
public static final int QUEUE_TYPE = 1;
public static final int TOPIC_TYPE = 2;
public static final int UNKNOWN_TYPE = 3;
-
+
// ----- Fields required to support new address syntax -------
public enum DestSyntax {
@@ -740,12 +741,12 @@ public abstract class AMQDestination imp
public boolean isAddressResolved()
{
- return _isAddressResolved;
+ return _addressResolved.get() > 0;
}
- public void setAddressResolved(boolean addressResolved)
+ public void setAddressResolved(long addressResolved)
{
- _isAddressResolved = addressResolved;
+ _addressResolved.set(addressResolved);
}
private static Address createAddressFromString(String str)
@@ -823,7 +824,7 @@ public abstract class AMQDestination imp
dest.setTargetNode(_targetNode);
dest.setSourceNode(_sourceNode);
dest.setLink(_link);
- dest.setAddressResolved(_isAddressResolved);
+ dest.setAddressResolved(_addressResolved.get());
return dest;
}
@@ -836,4 +837,9 @@ public abstract class AMQDestination imp
{
_isDurable = b;
}
+
+ public boolean isResolvedAfter(long time)
+ {
+ return _addressResolved.get() > time;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar 1 10:20:36 2012
@@ -89,9 +89,9 @@ import org.apache.qpid.client.message.Un
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
@@ -308,7 +308,7 @@ public abstract class AMQSession<C exten
protected final FlowControllingBlockingQueue _queue;
/** Holds the highest received delivery tag. */
- private final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
+ protected final AtomicLong _highestDeliveryTag = new AtomicLong(-1);
private final AtomicLong _rollbackMark = new AtomicLong(-1);
/** All the not yet acknowledged message tags */
@@ -534,7 +534,7 @@ public abstract class AMQSession<C exten
{
_queue = new FlowControllingBlockingQueue(_prefetchHighMark, null);
}
-
+
// Add creation logging to tie in with the existing close logging
if (_logger.isInfoEnabled())
{
@@ -856,6 +856,10 @@ public abstract class AMQSession<C exten
//Check that we are clean to commit.
if (_failedOverDirty)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Session " + _channelId + " was dirty whilst failing over. Rolling back.");
+ }
rollback();
throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
@@ -890,7 +894,7 @@ public abstract class AMQSession<C exten
C consumer = _consumers.get(consumerTag);
if (consumer != null)
{
- if (!consumer.isNoConsume()) // Normal Consumer
+ if (!consumer.isBrowseOnly()) // Normal Consumer
{
// Clean the Maps up first
// Flush any pending messages for this consumerTag
@@ -1092,7 +1096,7 @@ public abstract class AMQSession<C exten
// possible to determine when querying the broker whether there are no arguments or just a non-matching selector
// argument, as specifying null for the arguments when querying means they should not be checked at all
args.put(AMQPFilterTypes.JMS_SELECTOR.getValue().toString(), messageSelector == null ? "" : messageSelector);
-
+
// if the queue is bound to the exchange but NOT for this topic and selector, then the JMS spec
// says we must trash the subscription.
boolean isQueueBound = isQueueBound(dest.getExchangeName(), dest.getAMQQueueName());
@@ -1814,9 +1818,7 @@ public abstract class AMQSession<C exten
suspendChannel(true);
}
- // Let the dispatcher know that all the incomming messages
- // should be rolled back(reject/release)
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
syncDispatchQueue();
@@ -2008,28 +2010,11 @@ public abstract class AMQSession<C exten
AMQDestination amqd = (AMQDestination) destination;
- // TODO: Define selectors in AMQP
- // TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = FieldTableFactory.newFieldTable();
- // if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
- // rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
- {
- ft.addAll(rawSelector);
- }
-
- // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
- // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
- // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
- // argument, as specifying null for the arguments when querying means they should not be checked at all
- ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
-
C consumer;
try
{
consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
}
catch(TransportException e)
{
@@ -2570,7 +2555,7 @@ public abstract class AMQSession<C exten
* @param queueName
*/
private void consumeFromQueue(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector) throws AMQException, FailoverException
{
int tagId = _nextTag++;
@@ -2598,7 +2583,7 @@ public abstract class AMQSession<C exten
}
public abstract void sendConsume(C consumer, AMQShortString queueName,
- AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
+ AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException;
private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
throws JMSException
@@ -2923,7 +2908,7 @@ public abstract class AMQSession<C exten
try
{
- consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer._messageSelector);
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelectorFilter());
}
catch (FailoverException e)
{
@@ -3202,13 +3187,13 @@ public abstract class AMQSession<C exten
setConnectionStopped(true);
}
- _rollbackMark.set(_highestDeliveryTag.get());
+ setRollbackMark();
_dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
for (C consumer : _consumers.values())
{
- if (!consumer.isNoConsume())
+ if (!consumer.isBrowseOnly())
{
consumer.rollback();
}
@@ -3351,6 +3336,11 @@ public abstract class AMQSession<C exten
if (!(message instanceof CloseConsumerMessage)
&& tagLE(deliveryTag, _rollbackMark.get()))
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message because delivery tag " + deliveryTag
+ + " <= rollback mark " + _rollbackMark.get());
+ }
rejectMessage(message, true);
}
else if (_usingDispatcherForCleanup)
@@ -3390,7 +3380,7 @@ public abstract class AMQSession<C exten
}
else
{
- if (consumer.isNoConsume())
+ if (consumer.isBrowseOnly())
{
_dispatcherLogger.info("Received a message("
+ System.identityHashCode(message) + ")" + "["
@@ -3412,6 +3402,11 @@ public abstract class AMQSession<C exten
// Don't reject if we're already closing
if (!_closed.get())
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting message with delivery tag " + message.getDeliveryTag()
+ + " for closing consumer " + String.valueOf(consumer == null? null: consumer._consumerTag));
+ }
rejectMessage(message, true);
}
}
@@ -3542,4 +3537,15 @@ public abstract class AMQSession<C exten
{
return ((destination instanceof AMQDestination) && ((AMQDestination)destination).isBrowseOnly());
}
+
+ private void setRollbackMark()
+ {
+ // Let the dispatcher know that all the incomming messages
+ // should be rolled back(reject/release)
+ _rollbackMark.set(_highestDeliveryTag.get());
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rollback mark is set to " + _rollbackMark.get());
+ }
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Mar 1 10:20:36 2012
@@ -53,6 +53,7 @@ import org.apache.qpid.client.messaging.
import org.apache.qpid.client.messaging.address.Node.QueueNode;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQConstant;
@@ -74,6 +75,7 @@ import org.apache.qpid.transport.Session
import org.apache.qpid.transport.SessionListener;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -294,23 +296,34 @@ public class AMQSession_0_10 extends AMQ
}
}
- void messageAcknowledge(RangeSet ranges, boolean accept)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
- Session ssn = getQpidSession();
- for (Range range : ranges)
+ final Session ssn = getQpidSession();
+ flushProcessed(ranges,accept);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
+ }
+
+ /**
+ * Flush any outstanding commands. This causes session complete to be sent.
+ * @param ranges the range of command ids.
+ * @param batch true if batched.
+ */
+ void flushProcessed(final RangeSet ranges, final boolean batch)
+ {
+ final Session ssn = getQpidSession();
+ for (final Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+ ssn.processed(range);
}
+ ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
@@ -364,7 +377,7 @@ public class AMQSession_0_10 extends AMQ
_logger.debug("Binding queue : " + queue +
" exchange: " + exchange +
" using binding key " + binding.getBindingKey() +
- " with args " + printMap(binding.getArgs()));
+ " with args " + Strings.printMap(binding.getArgs()));
getQpidSession().exchangeBind(queue,
exchange,
binding.getBindingKey(),
@@ -496,13 +509,13 @@ public class AMQSession_0_10 extends AMQ
public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
- final FieldTable ft, final boolean noConsume,
+ final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+ _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
}
@@ -568,56 +581,30 @@ public class AMQSession_0_10 extends AMQ
* Registers the consumer with the broker
*/
public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
- boolean nowait, String messageSelector, int tag)
+ boolean nowait, MessageFilter messageSelector, int tag)
throws AMQException, FailoverException
{
- boolean preAcquire;
-
- long capacity = getCapacity(consumer.getDestination());
-
- try
- {
- boolean isTopic;
- Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
-
- if (consumer.getDestination().getDestSyntax() == AMQDestination.DestSyntax.BURL)
- {
- isTopic = consumer.getDestination() instanceof AMQTopic ||
- consumer.getDestination().getExchangeClass().equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS) ;
-
- preAcquire = isTopic || (!consumer.isNoConsume() &&
- (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")));
- }
- else
- {
- isTopic = consumer.getDestination().getAddressType() == AMQDestination.TOPIC_TYPE;
-
- preAcquire = !consumer.isNoConsume() &&
- (isTopic || consumer.getMessageSelector() == null ||
- consumer.getMessageSelector().equals(""));
-
- arguments.putAll(
- (Map<? extends String, ? extends Object>) consumer.getDestination().getLink().getSubscription().getArgs());
- }
-
- boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
-
- if (consumer.getDestination().getLink() != null)
- {
- acceptModeNone = consumer.getDestination().getLink().getReliability() == Link.Reliability.UNRELIABLE;
- }
-
- getQpidSession().messageSubscribe
- (queueName.toString(), String.valueOf(tag),
- acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
- preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
- consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
- }
- catch (JMSException e)
+ boolean preAcquire = consumer.isPreAcquire();
+
+ AMQDestination destination = consumer.getDestination();
+ long capacity = consumer.getCapacity();
+
+ Map<String, Object> arguments = FieldTable.convertToMap(consumer.getArguments());
+
+ Link link = destination.getLink();
+ if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null)
{
- throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
+ arguments.putAll((Map<? extends String, ? extends Object>) link.getSubscription().getArgs());
}
+ boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE;
+
+ getQpidSession().messageSubscribe
+ (queueName.toString(), String.valueOf(tag),
+ acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT,
+ preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments,
+ consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
+
String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
if (capacity == 0)
@@ -646,21 +633,6 @@ public class AMQSession_0_10 extends AMQ
}
}
- private long getCapacity(AMQDestination destination)
- {
- long capacity = 0;
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (prefetch())
- {
- capacity = getAMQConnection().getMaxPrefetch();
- }
- return capacity;
- }
-
/**
* Create an 0_10 message producer
*/
@@ -825,7 +797,7 @@ public class AMQSession_0_10 extends AMQ
//only set if msg list is null
try
{
- long capacity = getCapacity(consumer.getDestination());
+ long capacity = consumer.getCapacity();
if (capacity == 0)
{
@@ -969,17 +941,23 @@ public class AMQSession_0_10 extends AMQ
/**
* Store non committed messages for this session
- * With 0.10 messages are consumed with window mode, we must send a completion
- * before the window size is reached so credits don't dry up.
* @param id
*/
@Override protected void addDeliveredMessage(long id)
{
_txRangeSet.add((int) id);
_txSize++;
+ }
+
+ /**
+ * With 0.10 messages are consumed with window mode, we must send a completion
+ * before the window size is reached so credits don't dry up.
+ */
+ protected void sendTxCompletionsIfNecessary()
+ {
// this is a heuristic, we may want to have that configurable
- if (_connection.getMaxPrefetch() == 1 ||
- _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0)
+ if (_txSize > 0 && (_connection.getMaxPrefetch() == 1 ||
+ _connection.getMaxPrefetch() != 0 && _txSize % (_connection.getMaxPrefetch() / 2) == 0))
{
// send completed so consumer credits don't dry up
messageAcknowledge(_txRangeSet, false);
@@ -1168,8 +1146,8 @@ public class AMQSession_0_10 extends AMQ
boolean isConsumer,
boolean noWait) throws AMQException
{
- if (dest.isAddressResolved())
- {
+ if (dest.isAddressResolved() && dest.isResolvedAfter(_connection.getLastFailoverTime()))
+ {
if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType())
{
createSubscriptionQueue(dest);
@@ -1189,22 +1167,6 @@ public class AMQSession_0_10 extends AMQ
int type = resolveAddressType(dest);
- if (type == AMQDestination.QUEUE_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.AT_LEAST_ONCE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.UNSPECIFIED)
- {
- dest.getLink().setReliability(Reliability.UNRELIABLE);
- }
- else if (type == AMQDestination.TOPIC_TYPE &&
- dest.getLink().getReliability() == Reliability.AT_LEAST_ONCE)
- {
- throw new AMQException("AT-LEAST-ONCE is not yet supported for Topics");
- }
-
switch (type)
{
case AMQDestination.QUEUE_TYPE:
@@ -1258,7 +1220,7 @@ public class AMQSession_0_10 extends AMQ
"The name '" + dest.getAddressName() +
"' supplied in the address doesn't resolve to an exchange or a queue");
}
- dest.setAddressResolved(true);
+ dest.setAddressResolved(System.currentTimeMillis());
}
}
@@ -1352,22 +1314,6 @@ public class AMQSession_0_10 extends AMQ
dest.setRoutingKey(new AMQShortString(dest.getSubject()));
}
- /** This should be moved to a suitable utility class */
- private String printMap(Map<String,Object> map)
- {
- StringBuilder sb = new StringBuilder();
- sb.append("<");
- if (map != null)
- {
- for(String key : map.keySet())
- {
- sb.append(key).append(" = ").append(map.get(key)).append(" ");
- }
- }
- sb.append(">");
- return sb.toString();
- }
-
protected void acknowledgeImpl()
{
RangeSet range = gatherUnackedRangeSet();
@@ -1378,4 +1324,15 @@ public class AMQSession_0_10 extends AMQ
getQpidSession().sync();
}
}
+
+ @Override
+ void resubscribe() throws AMQException
+ {
+ // Also reset the delivery tag tracker, to insure we dont
+ // return the first <total number of msgs received on session>
+ // messages sent by the brokers following the first rollback
+ // after failover
+ _highestDeliveryTag.set(-1);
+ super.resubscribe();
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Mar 1 10:20:36 2012
@@ -41,6 +41,7 @@ import org.apache.qpid.client.state.AMQS
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
@@ -333,24 +334,9 @@ public final class AMQSession_0_8 extend
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
- String messageSelector,
+ MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if ((messageSelector != null) && !messageSelector.equals(""))
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
- }
-
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
-
- if (consumer.isNoConsume())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -359,7 +345,7 @@ public final class AMQSession_0_8 extend
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- arguments);
+ consumer.getArguments());
AMQFrame jmsConsume = body.generateFrame(_channelId);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar 1 10:20:36 2012
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -31,6 +35,7 @@ import org.apache.qpid.transport.Transpo
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -52,7 +57,7 @@ public abstract class BasicMessageConsum
/** The connection being used by this consumer */
protected final AMQConnection _connection;
- protected final String _messageSelector;
+ protected final MessageFilter _messageSelectorFilter;
private final boolean _noLocal;
@@ -138,7 +143,7 @@ public abstract class BasicMessageConsum
*/
private final boolean _autoClose;
- private final boolean _noConsume;
+ private final boolean _browseOnly;
private List<StackTraceElement> _closedStack = null;
@@ -146,28 +151,44 @@ public abstract class BasicMessageConsum
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
_connection = connection;
- _messageSelector = messageSelector;
_noLocal = noLocal;
_destination = destination;
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
_synchronousQueue = new LinkedBlockingQueue();
_autoClose = autoClose;
- _noConsume = noConsume;
+ _browseOnly = browseOnly;
+
+ try
+ {
+ if (messageSelector == null || "".equals(messageSelector.trim()))
+ {
+ _messageSelectorFilter = null;
+ }
+ else
+ {
+ _messageSelectorFilter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (final AMQInternalException ie)
+ {
+ InvalidSelectorException ise = new InvalidSelectorException("cannot create consumer because of selector issue");
+ ise.setLinkedException(ie);
+ throw ise;
+ }
// Force queue browsers not to use acknowledge modes.
- if (_noConsume)
+ if (_browseOnly)
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
@@ -175,6 +196,21 @@ public abstract class BasicMessageConsum
{
_acknowledgeMode = acknowledgeMode;
}
+
+ final FieldTable ft = FieldTableFactory.newFieldTable();
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
+ {
+ ft.addAll(rawSelector);
+ }
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+
+ _arguments = ft;
}
public AMQDestination getDestination()
@@ -186,7 +222,7 @@ public abstract class BasicMessageConsum
{
checkPreConditions();
- return _messageSelector;
+ return _messageSelectorFilter == null ? null :_messageSelectorFilter.getSelector();
}
public MessageListener getMessageListener() throws JMSException
@@ -345,6 +381,11 @@ public abstract class BasicMessageConsum
return _receiving.get();
}
+ public MessageFilter getMessageSelectorFilter()
+ {
+ return _messageSelectorFilter;
+ }
+
public Message receive() throws JMSException
{
return receive(0);
@@ -874,9 +915,9 @@ public abstract class BasicMessageConsum
return _autoClose;
}
- public boolean isNoConsume()
+ public boolean isBrowseOnly()
{
- return _noConsume;
+ return _browseOnly;
}
public void rollback()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Mar 1 10:20:36 2012
@@ -20,22 +20,20 @@ package org.apache.qpid.client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.client.AMQDestination.AddressOption;
-import org.apache.qpid.client.AMQDestination.DestSyntax;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.*;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.jms.Session;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
+
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,11 +49,6 @@ public class BasicMessageConsumer_0_10 e
protected final Logger _logger = LoggerFactory.getLogger(getClass());
/**
- * The message selector filter associated with this consumer message selector
- */
- private MessageFilter _filter = null;
-
- /**
* The underlying QpidSession
*/
private AMQSession_0_10 _0_10session;
@@ -63,7 +56,7 @@ public class BasicMessageConsumer_0_10 e
/**
* Indicates whether this consumer receives pre-acquired messages
*/
- private boolean _preAcquire = true;
+ private final boolean _preAcquire;
/**
* Specify whether this consumer is performing a sync receive
@@ -71,44 +64,27 @@ public class BasicMessageConsumer_0_10 e
private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
private String _consumerTagString;
- private long capacity = 0;
+ private final long _capacity;
+
+ /** Flag indicating if the server supports message selectors */
+ protected final boolean _serverJmsSelectorSupport;
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
- AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
- if (messageSelector != null && !messageSelector.equals(""))
- {
- try
- {
- _filter = new JMSSelectorFilter(messageSelector);
- }
- catch (AMQInternalException e)
- {
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
- }
- if (destination instanceof AMQQueue)
- {
- _preAcquire = false;
- }
- }
-
- // Destination setting overrides connection defaults
- if (destination.getDestSyntax() == DestSyntax.ADDR &&
- destination.getLink().getConsumerCapacity() > 0)
- {
- capacity = destination.getLink().getConsumerCapacity();
- }
- else if (getSession().prefetch())
- {
- capacity = _0_10session.getAMQConnection().getMaxPrefetch();
- }
+
+ _preAcquire = evaluatePreAcquire(browseOnly, destination);
+
+ _capacity = evaluateCapacity(destination);
+ _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+
if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType())
{
@@ -122,7 +98,6 @@ public class BasicMessageConsumer_0_10 e
}
}
-
@Override public void setConsumerTag(int consumerTag)
{
super.setConsumerTag(consumerTag);
@@ -148,15 +123,22 @@ public class BasicMessageConsumer_0_10 e
{
if (checkPreConditions(jmsMessage))
{
- if (isMessageListenerSet() && capacity == 0)
+ if (isMessageListenerSet() && _capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
_logger.debug("messageOk, trying to notify");
super.notifyMessage(jmsMessage);
}
+ else
+ {
+ // if we are synchronously waiting for a message
+ // and messages are not pre-fetched we then need to request another one
+ if(_capacity == 0)
+ {
+ messageFlow();
+ }
+ }
}
catch (AMQException e)
{
@@ -227,12 +209,11 @@ public class BasicMessageConsumer_0_10 e
private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
- // TODO Use a tag for fiding out if message filtering is done here or by the broker.
try
{
- if (_messageSelector != null && !_messageSelector.equals(""))
+ if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
{
- messageOk = _filter.matches(message);
+ messageOk = _messageSelectorFilter.matches(message);
}
}
catch (Exception e)
@@ -245,6 +226,7 @@ public class BasicMessageConsumer_0_10 e
_logger.debug("messageOk " + messageOk);
_logger.debug("_preAcquire " + _preAcquire);
}
+
if (!messageOk)
{
if (_preAcquire)
@@ -261,23 +243,15 @@ public class BasicMessageConsumer_0_10 e
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Message not OK, releasing");
+ _logger.debug("filterMessage - not ack'ing message as not acquired");
}
- releaseMessage(message);
- }
- // if we are syncrhonously waiting for a message
- // and messages are not prefetched we then need to request another one
- if(capacity == 0)
- {
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ flushUnwantedMessage(message);
}
}
- // now we need to acquire this message if needed
- // this is the case of queue with a message selector set
- if (!_preAcquire && messageOk && !isNoConsume())
+ else if (!_preAcquire && !isBrowseOnly())
{
+ // now we need to acquire this message if needed
+ // this is the case of queue with a message selector set
if (_logger.isDebugEnabled())
{
_logger.debug("filterMessage - trying to acquire message");
@@ -285,6 +259,7 @@ public class BasicMessageConsumer_0_10 e
messageOk = acquireMessage(message);
_logger.debug("filterMessage - message acquire status : " + messageOk);
}
+
return messageOk;
}
@@ -295,38 +270,38 @@ public class BasicMessageConsumer_0_10 e
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
+ private void acknowledgeMessage(final AbstractJMSMessage message) throws AMQException
{
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.messageAcknowledge
- (ranges,
- _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.messageAcknowledge
+ (ranges,
+ _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
- AMQException amqe = _0_10session.getCurrentException();
- if (amqe != null)
- {
- throw amqe;
- }
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
}
}
/**
- * Release a message
+ * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+ * processed to ensure their AMQP command-id is marked completed.
*
- * @param message The message to be released
- * @throws AMQException If the message cannot be released due to some internal error.
+ * @param message The unwanted message to be flushed
+ * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
*/
- private void releaseMessage(AbstractJMSMessage message) throws AMQException
+ private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
{
- if (_preAcquire)
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.flushProcessed(ranges,false);
+
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
{
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
- _0_10session.getQpidSession().messageRelease(ranges);
- _0_10session.sync();
+ throw amqe;
}
}
@@ -337,36 +312,37 @@ public class BasicMessageConsumer_0_10 e
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
+ private boolean acquireMessage(final AbstractJMSMessage message) throws AMQException
{
boolean result = false;
- if (!_preAcquire)
- {
- RangeSet ranges = new RangeSet();
- ranges.add((int) message.getDeliveryTag());
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
- Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
+ final Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
- RangeSet acquired = acq.getTransfers();
- if (acquired != null && acquired.size() > 0)
- {
- result = true;
- }
+ final RangeSet acquired = acq.getTransfers();
+ if (acquired != null && acquired.size() > 0)
+ {
+ result = true;
}
return result;
}
+ private void messageFlow()
+ {
+ _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
+ MessageCreditUnit.MESSAGE, 1,
+ Option.UNRELIABLE);
+ }
public void setMessageListener(final MessageListener messageListener) throws JMSException
{
super.setMessageListener(messageListener);
try
{
- if (messageListener != null && capacity == 0)
+ if (messageListener != null && _capacity == 0)
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
if (messageListener != null && !_synchronousQueue.isEmpty())
{
@@ -389,9 +365,7 @@ public class BasicMessageConsumer_0_10 e
{
if (_0_10session.isStarted() && _syncReceive.get())
{
- _0_10session.getQpidSession().messageFlow
- (getConsumerTagString(), MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
}
@@ -406,15 +380,13 @@ public class BasicMessageConsumer_0_10 e
*/
public Object getMessageFromQueue(long l) throws InterruptedException
{
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(true);
}
- if (_0_10session.isStarted() && capacity == 0 && _synchronousQueue.isEmpty())
+ if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
{
- _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
- MessageCreditUnit.MESSAGE, 1,
- Option.UNRELIABLE);
+ messageFlow();
}
Object o = super.getMessageFromQueue(l);
if (o == null && _0_10session.isStarted())
@@ -427,18 +399,18 @@ public class BasicMessageConsumer_0_10 e
(getConsumerTagString(), MessageCreditUnit.BYTE,
0xFFFFFFFF, Option.UNRELIABLE);
- if (capacity > 0)
+ if (_capacity > 0)
{
_0_10session.getQpidSession().messageFlow
(getConsumerTagString(),
MessageCreditUnit.MESSAGE,
- capacity,
+ _capacity,
Option.UNRELIABLE);
}
_0_10session.syncDispatchQueue();
o = super.getMessageFromQueue(-1);
}
- if (capacity == 0)
+ if (_capacity == 0)
{
_syncReceive.set(false);
}
@@ -448,16 +420,26 @@ public class BasicMessageConsumer_0_10 e
void postDeliver(AbstractJMSMessage msg)
{
super.postDeliver(msg);
- if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE && !_session.isInRecovery())
+
+ switch (_acknowledgeMode)
{
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ case Session.SESSION_TRANSACTED:
+ _0_10session.sendTxCompletionsIfNecessary();
+ break;
+ case Session.NO_ACKNOWLEDGE:
+ if (!_session.isInRecovery())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ break;
+ case Session.AUTO_ACKNOWLEDGE:
+ if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+ {
+ ((AMQSession_0_10) getSession()).getQpidSession().sync();
+ }
+ break;
}
- if (_acknowledgeMode == org.apache.qpid.jms.Session.AUTO_ACKNOWLEDGE &&
- !_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
- {
- ((AMQSession_0_10) getSession()).getQpidSession().sync();
- }
}
Message receiveBrowse() throws JMSException
@@ -526,4 +508,51 @@ public class BasicMessageConsumer_0_10 e
}
}
}
+
+ long getCapacity()
+ {
+ return _capacity;
+ }
+
+ boolean isPreAcquire()
+ {
+ return _preAcquire;
+ }
+
+ private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+ {
+ boolean preAcquire;
+ if (browseOnly)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
+ if (isQueue && getMessageSelectorFilter() != null)
+ {
+ preAcquire = false;
+ }
+ else
+ {
+ preAcquire = true;
+ }
+ }
+ return preAcquire;
+ }
+
+ private long evaluateCapacity(AMQDestination destination)
+ {
+ long capacity = 0;
+ if (destination.getLink() != null && destination.getLink().getConsumerCapacity() > 0)
+ {
+ capacity = destination.getLink().getConsumerCapacity();
+ }
+ else if (getSession().prefetch())
+ {
+ capacity = _0_10session.getAMQConnection().getMaxPrefetch();
+ }
+ return capacity;
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Mar 1 10:20:36 2012
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.client;
-import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,24 +38,23 @@ public class BasicMessageConsumer_0_8 ex
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
+ AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
- acknowledgeMode, noConsume, autoClose);
- try
+ protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
+ acknowledgeMode, browseOnly, autoClose);
+ final FieldTable consumerArguments = getArguments();
+ if (isAutoClose())
{
-
- if (messageSelector != null && messageSelector.length() > 0)
- {
- JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
- }
+ consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
- catch (AMQInternalException e)
+
+ if (isBrowseOnly())
{
- throw new InvalidSelectorException("cannot create consumer because of selector issue");
+ consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
+
}
void sendCancel() throws AMQException, FailoverException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Mar 1 10:20:36 2012
@@ -238,7 +238,7 @@ public class BasicMessageProducer_0_10 e
}
catch (Exception e)
{
- JMSException jmse = new JMSException("Exception when sending message");
+ JMSException jmse = new JMSException("Exception when sending message:" + e.getMessage());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java Thu Mar 1 10:20:36 2012
@@ -20,18 +20,14 @@
*/
package org.apache.qpid.client.messaging.address;
-import static org.apache.qpid.client.messaging.address.Link.Reliability.UNSPECIFIED;
-
import java.util.HashMap;
import java.util.Map;
-import org.apache.qpid.client.messaging.address.Node.QueueNode;
-
public class Link
{
public enum FilterType { SQL92, XQUERY, SUBJECT }
- public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE, UNSPECIFIED }
+ public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE }
protected String name;
protected String _filter;
@@ -42,7 +38,7 @@ public class Link
protected int _producerCapacity = 0;
protected Node node;
protected Subscription subscription;
- protected Reliability reliability = UNSPECIFIED;
+ protected Reliability reliability = Reliability.AT_LEAST_ONCE;
public Reliability getReliability()
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Mar 1 10:20:36 2012
@@ -47,6 +47,7 @@ import org.apache.qpid.framing.ProtocolV
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -362,7 +363,15 @@ public class AMQProtocolSession implemen
public void closeProtocolSession() throws AMQException
{
- _protocolHandler.closeConnection(0);
+ try
+ {
+ _protocolHandler.getNetworkConnection().close();
+ }
+ catch(TransportException e)
+ {
+ //ignore such exceptions, they were already logged
+ //and this is a forcible close.
+ }
}
public void failover(String host, int port)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/JMSSelectorFilter.java Thu Mar 1 10:20:36 2012
@@ -26,20 +26,21 @@ import org.slf4j.LoggerFactory;
public class JMSSelectorFilter implements MessageFilter
{
- /**
- * this JMSSelectorFilter's logger
- */
private static final Logger _logger = LoggerFactory.getLogger(JMSSelectorFilter.class);
- private String _selector;
- private BooleanExpression _matcher;
+ private final String _selector;
+ private final BooleanExpression _matcher;
public JMSSelectorFilter(String selector) throws AMQInternalException
{
+ if (selector == null || "".equals(selector))
+ {
+ throw new IllegalArgumentException("Cannot create a JMSSelectorFilter with a null or empty selector string");
+ }
_selector = selector;
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug("Created JMSSelectorFilter with selector:" + _selector);
+ _logger.debug("Created JMSSelectorFilter with selector:" + _selector);
}
_matcher = new SelectorParser().parse(selector);
}
@@ -49,16 +50,15 @@ public class JMSSelectorFilter implement
try
{
boolean match = _matcher.matches(message);
- if (JMSSelectorFilter._logger.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- JMSSelectorFilter._logger.debug(message + " match(" + match + ") selector(" + System
- .identityHashCode(_selector) + "):" + _selector);
+ _logger.debug(message + " match(" + match + ") selector(" + _selector + "): " + _selector);
}
return match;
}
catch (AMQInternalException e)
{
- JMSSelectorFilter._logger.warn("Caght exception when evaluating message selector for message " + message, e);
+ _logger.warn("Caught exception when evaluating message selector for message " + message, e);
}
return false;
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/filter/MessageFilter.java Thu Mar 1 10:20:36 2012
@@ -17,11 +17,11 @@
*/
package org.apache.qpid.filter;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.client.message.AbstractJMSMessage;
public interface MessageFilter
{
- boolean matches(AbstractJMSMessage message) throws AMQInternalException;
+ boolean matches(AbstractJMSMessage message);
+ String getSelector();
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java Thu Mar 1 10:20:36 2012
@@ -29,7 +29,6 @@ import javax.jms.MessageProducer;
import junit.framework.TestCase;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Connection.SessionFactory;
@@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
}
catch (Exception e)
@@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
fail("JMSException should be thrown");
@@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
}
@@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receiveNoWait();
fail("JMSException should be thrown");
@@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
fail("JMSException should be thrown");
}
@@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
}
catch (Exception e)
@@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
}
catch (Exception e)
@@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
fail("JMSException should be thrown");
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/MessageConverterTest.java Thu Mar 1 10:20:36 2012
@@ -47,12 +47,17 @@ public class MessageConverterTest extend
protected JMSTextMessage testTextMessage;
protected JMSMapMessage testMapMessage;
- private AMQSession _session = new TestAMQSession();
+ private AMQConnection _connection;
+ private AMQSession _session;
protected void setUp() throws Exception
{
super.setUp();
+
+ _connection = new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'");
+ _session = new TestAMQSession(_connection);
+
testTextMessage = new JMSTextMessage(AMQMessageDelegateFactory.FACTORY_0_8);
//Set Message Text
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Mar 1 10:20:36 2012
@@ -29,22 +29,25 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.BasicMessageConsumer_0_8;
import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.MockAMQConnection;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
- public TestAMQSession()
+ public TestAMQSession(AMQConnection connection)
{
- super(null, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
+ super(connection, 0, false, AUTO_ACKNOWLEDGE, null, 0, 0);
}
public void acknowledgeMessage(long deliveryTag, boolean multiple)
@@ -124,7 +127,7 @@ public class TestAMQSession extends AMQS
return false;
}
- public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, MessageFilter messageSelector, int tag) throws AMQException, FailoverException
{
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common.xml Thu Mar 1 10:20:36 2012
@@ -23,7 +23,7 @@
<dirname property="project.root" file="${ant.file.common}"/>
<property name="project.name" value="qpid"/>
- <property name="project.version" value="0.13"/>
+ <property name="project.version" value="0.15"/>
<property name="project.url" value="http://qpid.apache.org"/>
<property name="project.groupid" value="org.apache.qpid"/>
<property name="project.namever" value="${project.name}-${project.version}"/>
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/common.bnd Thu Mar 1 10:20:36 2012
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.15.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Thu Mar 1 10:20:36 2012
@@ -832,9 +832,12 @@ public class FieldTable
public void addAll(FieldTable fieldTable)
{
initMapIfNecessary();
- _encodedForm = null;
- _properties.putAll(fieldTable._properties);
- recalculateEncodedSize();
+ if (fieldTable._properties != null)
+ {
+ _encodedForm = null;
+ _properties.putAll(fieldTable._properties);
+ recalculateEncodedSize();
+ }
}
public static Map<String, Object> convertToMap(final FieldTable fieldTable)
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Thu Mar 1 10:20:36 2012
@@ -526,10 +526,6 @@ public class Connection extends Connecti
{
synchronized (lock)
{
- for (Session ssn : channels.values())
- {
- ssn.closeCode(close);
- }
ConnectionCloseCode code = close.getReplyCode();
if (code != ConnectionCloseCode.NORMAL)
{
@@ -701,8 +697,17 @@ public class Connection extends Connecti
return channels.values();
}
- public boolean hasSessionWithName(final String name)
+ public boolean hasSessionWithName(final byte[] name)
{
- return sessions.containsKey(new Binary(name.getBytes()));
+ return sessions.containsKey(new Binary(name));
+ }
+
+ public void notifyFailoverRequired()
+ {
+ List<Session> values = new ArrayList<Session>(channels.values());
+ for (Session ssn : values)
+ {
+ ssn.notifyFailoverRequired();
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1295495&r1=1295494&r2=1295495&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Thu Mar 1 10:20:36 2012
@@ -195,10 +195,17 @@ public class ServerDelegate extends Conn
@Override
public void sessionAttach(Connection conn, SessionAttach atc)
{
+ sessionAttachImpl(conn, atc);
+ }
+
+ protected Session sessionAttachImpl(Connection conn, SessionAttach atc)
+ {
Session ssn = getSession(conn, atc);
conn.map(ssn, atc.getChannel());
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
+
+ return ssn;
}
protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org