You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/26 22:04:39 UTC
svn commit: r490372 [2/3] - in
/incubator/qpid/branches/new_persistence/java: ./ broker/ broker/etc/
broker/src/main/grammar/ broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/filter/
broker/src/main/java/org/apac...
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,8 @@
import org.apache.qpid.AMQException;
+import java.util.Queue;
+
public interface Subscription
{
void send(AMQMessage msg, AMQQueue queue) throws AMQException;
@@ -29,4 +31,18 @@
boolean isSuspended();
void queueDeleted(AMQQueue queue) throws AMQException;
+
+ boolean hasFilters();
+
+ boolean hasInterest(AMQMessage msg);
+
+ Queue<AMQMessage> getPreDeliveryQueue();
+
+ void enqueueForPreDelivery(AMQMessage msg);
+
+ boolean isAutoClose();
+
+ void close();
+
+ boolean isBrowser();
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionFactory.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,7 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
/**
* Allows the customisation of the creation of a subscription. This is typically done within an AMQQueue. This
@@ -32,9 +33,10 @@
*/
public interface SubscriptionFactory
{
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException;
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks,
+ FieldTable filters, boolean noLocal) throws AMQException;
- Subscription createSubscription(int channel, AMQProtocolSession protocolSession,String consumerTag)
- throws AMQException;
+
+ Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
+ throws AMQException;
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Dec 26 13:04:28 2006
@@ -22,8 +22,21 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.mina.common.ByteBuffer;
+
+import java.util.Queue;
/**
* Encapsulation of a supscription to a queue.
@@ -44,23 +57,30 @@
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+ private final boolean _noLocal;
+
/**
* True if messages need to be acknowledged
*/
private final boolean _acks;
+ private FilterManager _filters;
+ private final boolean _isBrowser;
+ private final Boolean _autoClose;
+ private boolean _closed = false;
public static class Factory implements SubscriptionFactory
{
- public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks)
- throws AMQException
+ public Subscription createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag, boolean acks, FieldTable filters, boolean noLocal) throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag, acks);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, acks, filters, noLocal);
}
public SubscriptionImpl createSubscription(int channel, AMQProtocolSession protocolSession, String consumerTag)
throws AMQException
{
- return new SubscriptionImpl(channel, protocolSession, consumerTag);
+ return new SubscriptionImpl(channel, protocolSession, consumerTag, false, null, false);
}
}
@@ -68,6 +88,13 @@
String consumerTag, boolean acks)
throws AMQException
{
+ this(channelId, protocolSession, consumerTag, acks, null, false);
+ }
+
+ public SubscriptionImpl(int channelId, AMQProtocolSession protocolSession,
+ String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
+ throws AMQException
+ {
AMQChannel channel = protocolSession.getChannel(channelId);
if (channel == null)
{
@@ -79,8 +106,61 @@
this.consumerTag = consumerTag;
sessionKey = protocolSession.getKey();
_acks = acks;
+ _noLocal = noLocal;
+
+ _filters = FilterManagerFactory.createManager(filters);
+
+
+ if (_filters != null)
+ {
+ Object isBrowser = filters.get(AMQPFilterTypes.NO_CONSUME.getValue());
+ if (isBrowser != null)
+ {
+ _isBrowser = (Boolean) isBrowser;
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+ }
+ else
+ {
+ _isBrowser = false;
+ }
+
+
+ if (_filters != null)
+ {
+ Object autoClose = filters.get(AMQPFilterTypes.AUTO_CLOSE.getValue());
+ if (autoClose != null)
+ {
+ _autoClose = (Boolean) autoClose;
+ }
+ else
+ {
+ _autoClose = false;
+ }
+ }
+ else
+ {
+ _autoClose = false;
+ }
+
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+
+
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
+
public SubscriptionImpl(int channel, AMQProtocolSession protocolSession,
String consumerTag)
throws AMQException
@@ -125,6 +205,44 @@
{
if (msg != null)
{
+ if (_isBrowser)
+ {
+ sendToBrowser(msg, queue);
+ }
+ else
+ {
+ sendToConsumer(msg, queue);
+ }
+ }
+ else
+ {
+ _logger.error("Attempt to send Null message", new NullPointerException());
+ }
+ }
+
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
+ {
+ // We don't decrement the reference here as we don't want to consume the message
+ // but we do want to send it to the client.
+
+ synchronized(channel)
+ {
+ long deliveryTag = channel.getNextDeliveryTag();
+
+ // We don't need to add the message to the unacknowledgedMap as we don't need to know if the client
+ // received the message. If it is lost in transit that is not important.
+ if (_acks)
+ {
+ channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+ }
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
+ }
+ }
+
+ private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws AMQException
+ {
+ try
+ {
// if we do not need to wait for client acknowledgements
// we can decrement the reference count immediately.
@@ -150,9 +268,9 @@
msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
- else
+ finally
{
- _logger.error("Attempt to send Null message", new NullPointerException());
+ msg.setDeliveredToConsumer();
}
}
@@ -169,5 +287,111 @@
public void queueDeleted(AMQQueue queue) throws AMQException
{
channel.queueDeleted(queue);
+ }
+
+ public boolean hasFilters()
+ {
+ return _filters != null;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ if (_noLocal)
+ {
+ // We don't want local messages so check to see if message is one we sent
+ if (protocolSession.getClientProperties().get(ClientProperties.instance.toString()).equals(
+ msg.getPublisher().getClientProperties().get(ClientProperties.instance.toString())))
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no interest as it is a local message(" +
+ System.identityHashCode(msg) + ")");
+ }
+ return false;
+ }
+ else // if not then filter the message.
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") local message(" + System.identityHashCode(msg) +
+ ") but not ours so filtering");
+ }
+ return checkFilters(msg);
+ }
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") checking filters for message (" + System.identityHashCode(msg));
+ }
+ return checkFilters(msg);
+ }
+ }
+
+ private boolean checkFilters(AMQMessage msg)
+ {
+ if (_filters != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has filters.");
+ }
+ return _filters.allAllow(msg);
+ }
+ else
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("(" + System.identityHashCode(this) + ") has no filters");
+ }
+
+ return true;
+ }
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+ public void close()
+ {
+ if (!_closed)
+ {
+ _logger.info("Closing autoclose subscription:" + this);
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ _closed = true;
+ }
+ }
+
+ public boolean isBrowser()
+ {
+ return _isBrowser;
+ }
+
+
+ private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ {
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
+ deliveryTag, false, exchange,
+ routingKey);
+ ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java Tue Dec 26 13:04:28 2006
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Tue Dec 26 13:04:28 2006
@@ -60,6 +60,7 @@
/**
* Remove the subscription, returning it if it was found
+ *
* @param subscription
* @return null if no match was found
*/
@@ -92,7 +93,7 @@
/**
* Return the next unsuspended subscription or null if not found.
- *
+ * <p/>
* Performance note:
* This method can scan all items twice when looking for a subscription that is not
* suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
@@ -107,31 +108,51 @@
return null;
}
- try {
- final Subscription result = nextSubscriber();
- if (result == null) {
+ try
+ {
+ final Subscription result = nextSubscriberImpl(msg);
+ if (result == null)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
- } else {
+ return nextSubscriberImpl(msg);
+ }
+ else
+ {
return result;
}
- } catch (IndexOutOfBoundsException e) {
+ }
+ catch (IndexOutOfBoundsException e)
+ {
_currentSubscriber = 0;
- return nextSubscriber();
+ return nextSubscriber(msg);
}
}
- private Subscription nextSubscriber()
+ private Subscription nextSubscriberImpl(AMQMessage msg)
{
final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
- while (iterator.hasNext()) {
+ while (iterator.hasNext())
+ {
Subscription subscription = iterator.next();
++_currentSubscriber;
subscriberScanned();
- if (!subscription.isSuspended()) {
- return subscription;
+
+ if (!subscription.isSuspended())
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a message
+ // OR have a clean up thread that runs the PDQs expunging the messages.
+ if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
}
}
+
return null;
}
@@ -147,11 +168,19 @@
return _subscriptions.isEmpty();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
+ }
+
public boolean hasActiveSubscribers()
{
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) return true;
+ if (!s.isSuspended())
+ {
+ return true;
+ }
}
return false;
}
@@ -161,7 +190,10 @@
int count = 0;
for (Subscription s : _subscriptions)
{
- if (!s.isSuspended()) count++;
+ if (!s.isSuspended())
+ {
+ count++;
+ }
}
return count;
}
@@ -169,6 +201,7 @@
/**
* Notification that a queue has been deleted. This is called so that the subscription can inform the
* channel, which in turn can update its list of unacknowledged messages.
+ *
* @param queue
*/
public void queueDeleted(AMQQueue queue) throws AMQException
@@ -179,7 +212,8 @@
}
}
- int size() {
+ int size()
+ {
return _subscriptions.size();
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Tue Dec 26 13:04:28 2006
@@ -35,7 +35,7 @@
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log = Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log = Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
@@ -124,6 +124,11 @@
return new ArrayList<AMQMessage>(_messages);
}
+ public void populatePreDeliveryQueue(Subscription subscription)
+ {
+ //no-op . This DM has no PreDeliveryQueues
+ }
+
public synchronized void removeAMessageFromTop() throws AMQException
{
AMQMessage msg = poll();
@@ -245,7 +250,6 @@
else
{
s.send(msg, _queue);
- msg.setDeliveredToConsumer();
}
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue Dec 26 13:04:28 2006
@@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
/**
* @author Apache Software Foundation
@@ -49,6 +50,8 @@
*/
private final List<RequiredDeliveryException> _returnMessages;
+ private Set<Long> _browsedAcks;
+
private final MessageStore _messageStore;
/**
@@ -57,11 +60,12 @@
private boolean _inTran;
public NonTransactionalContext(MessageStore messageStore, AMQChannel channel,
- List<RequiredDeliveryException> returnMessages)
+ List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
{
_channel = channel;
_returnMessages = returnMessages;
_messageStore = messageStore;
+ _browsedAcks = browsedAcks;
}
public void beginTranIfNecessary() throws AMQException
@@ -111,12 +115,19 @@
//Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
// tells the server to acknowledge all outstanding mesages.
_log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
- unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.size());
unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
public boolean callback(UnacknowledgedMessage message) throws AMQException
{
- message.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ message.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
return false;
}
@@ -137,7 +148,14 @@
unacknowledgedMessageMap.drainTo(acked, deliveryTag);
for (UnacknowledgedMessage msg : acked)
{
- msg.discard();
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ msg.discard();
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
}
}
}
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/util/CircularBuffer.java Tue Dec 26 13:04:28 2006
@@ -20,10 +20,15 @@
*/
package org.apache.qpid.server.util;
+import org.apache.log4j.Logger;
+
import java.util.Iterator;
public class CircularBuffer implements Iterable
{
+
+ private static final Logger _logger = Logger.getLogger(CircularBuffer.class);
+
private final Object[] _log;
private int _size;
private int _index;
@@ -102,7 +107,7 @@
{
for(Object o : this)
{
- System.out.println(o);
+ _logger.info(o);
}
}
@@ -120,7 +125,7 @@
for(String s : items)
{
buffer.add(s);
- System.out.println(buffer);
+ _logger.info(buffer);
}
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/pom.xml?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/client/pom.xml Tue Dec 26 13:04:28 2006
@@ -35,7 +35,6 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -96,6 +95,11 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Modified: incubator/qpid/branches/new_persistence/java/client/src/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/log4j.properties?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/log4j.properties (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/log4j.properties Tue Dec 26 13:04:28 2006
@@ -1,10 +1,10 @@
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=${amqj.logging.level}
log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=info
+log4j.appender.console.Threshold=debug
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/log4j.properties Tue Dec 26 13:04:28 2006
@@ -16,10 +16,10 @@
# specific language governing permissions and limitations
# under the License.
#
-log4j.rootLogger=WARN
+log4j.rootLogger=${amqj.logging.level}
-log4j.logger.org.apache.qpid=WARN, console
+log4j.logger.org.apache.qpid=${amqj.logging.level}, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Dec 26 13:04:28 2006
@@ -23,13 +23,15 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -69,15 +71,15 @@
private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
/**
- * Used to reference durable subscribers so they requests for unsubscribe can be handled
- * correctly. Note this only keeps a record of subscriptions which have been created
- * in the current instance. It does not remember subscriptions between executions of the
- * client
+ * Used to reference durable subscribers so they requests for unsubscribe can be handled
+ * correctly. Note this only keeps a record of subscriptions which have been created
+ * in the current instance. It does not remember subscriptions between executions of the
+ * client
*/
private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
- new ConcurrentHashMap<BasicMessageConsumer, String>();
+ new ConcurrentHashMap<BasicMessageConsumer, String>();
/**
* Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -143,6 +145,7 @@
private boolean _inRecovery;
+
/**
* Responsible for decoding a message fragment and passing it to the appropriate message consumer.
*/
@@ -176,7 +179,7 @@
{
if (message.deliverBody != null)
{
- final BasicMessageConsumer consumer = _consumers.get(message.deliverBody.consumerTag);
+ final BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(message.deliverBody.consumerTag);
if (consumer == null)
{
@@ -210,17 +213,15 @@
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage));
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage));
- }
- else
- {
- _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
- }
+ _connection.exceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage));
}
+
}
catch (Exception e)
{
@@ -318,7 +319,7 @@
public BytesMessage createBytesMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -334,7 +335,7 @@
public MapMessage createMapMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -350,7 +351,7 @@
public javax.jms.Message createMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -366,7 +367,7 @@
public ObjectMessage createObjectMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -382,7 +383,7 @@
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -400,7 +401,7 @@
public StreamMessage createStreamMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -417,7 +418,7 @@
public TextMessage createTextMessage() throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
@@ -434,7 +435,7 @@
public TextMessage createTextMessage(String text) throws JMSException
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
checkNotClosed();
try
@@ -504,7 +505,7 @@
{
// We must close down all producers and consumers in an orderly fashion. This is the only method
// that can be called from a different thread of control from the one controlling the session
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
@@ -569,7 +570,7 @@
*/
public void closed(Throwable e)
{
- synchronized (_connection.getFailoverMutex())
+ synchronized(_connection.getFailoverMutex())
{
// An AMQException has an error code and message already and will be passed in when closure occurs as a
// result of a channel close request
@@ -721,11 +722,11 @@
public void acknowledge() throws JMSException
{
- if(isClosed())
+ if (isClosed())
{
throw new IllegalStateException("Session is already closed");
}
- for(BasicMessageConsumer consumer : _consumers.values())
+ for (BasicMessageConsumer consumer : _consumers.values())
{
consumer.acknowledge();
}
@@ -734,7 +735,6 @@
}
-
public MessageListener getMessageListener() throws JMSException
{
checkNotClosed();
@@ -843,7 +843,9 @@
false,
false,
null,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
@@ -855,7 +857,9 @@
false,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
}
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
@@ -868,7 +872,26 @@
noLocal,
false,
messageSelector,
- null);
+ null,
+ false,
+ false);
+ }
+
+ public MessageConsumer createBrowserConsumer(Destination destination,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException
+ {
+ checkValidDestination(destination);
+ return createConsumerImpl(destination,
+ _defaultPrefetchHighMark,
+ _defaultPrefetchLowMark,
+ noLocal,
+ false,
+ messageSelector,
+ null,
+ true,
+ true);
}
public MessageConsumer createConsumer(Destination destination,
@@ -878,7 +901,7 @@
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive, selector, null, false, false);
}
@@ -890,7 +913,7 @@
String selector) throws JMSException
{
checkValidDestination(destination);
- return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null);
+ return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive, selector, null, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -902,7 +925,7 @@
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetch, prefetch, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
public MessageConsumer createConsumer(Destination destination,
@@ -915,7 +938,7 @@
{
checkValidDestination(destination);
return createConsumerImpl(destination, prefetchHigh, prefetchLow, noLocal, exclusive,
- selector, rawSelector);
+ selector, rawSelector, false, false);
}
protected MessageConsumer createConsumerImpl(final Destination destination,
@@ -924,7 +947,9 @@
final boolean noLocal,
final boolean exclusive,
final String selector,
- final FieldTable rawSelector) throws JMSException
+ final FieldTable rawSelector,
+ final boolean noConsume,
+ final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
@@ -948,12 +973,18 @@
BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, _connection, amqd, selector, noLocal,
_messageFactoryRegistry, AMQSession.this,
protocolHandler, ft, prefetchHigh, prefetchLow, exclusive,
- _acknowledgeMode);
+ _acknowledgeMode, noConsume, autoClose);
try
{
registerConsumer(consumer, false);
}
+ catch (AMQInvalidSelectorException ise)
+ {
+ JMSException ex = new InvalidSelectorException(ise.getMessage());
+ ex.setLinkedException(ise);
+ throw ex;
+ }
catch (AMQException e)
{
JMSException ex = new JMSException("Error registering consumer: " + e);
@@ -963,7 +994,7 @@
synchronized(destination)
{
- _destinationConsumerCount.putIfAbsent(destination,new AtomicInteger());
+ _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
_destinationConsumerCount.get(destination).incrementAndGet();
}
@@ -975,16 +1006,16 @@
private void checkTemporaryDestination(Destination destination)
throws JMSException
{
- if((destination instanceof TemporaryDestination))
+ if ((destination instanceof TemporaryDestination))
{
_logger.debug("destination is temporary");
final TemporaryDestination tempDest = (TemporaryDestination) destination;
- if(tempDest.getSession() != this)
+ if (tempDest.getSession() != this)
{
_logger.debug("destination is on different session");
throw new JMSException("Cannot consume from a temporary destination created onanother session");
}
- if(tempDest.isDeleted())
+ if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot consume from a deleted destination");
@@ -1065,12 +1096,26 @@
* @return the consumer tag generated by the broker
*/
private void consumeFromQueue(BasicMessageConsumer consumer, String queueName, AMQProtocolHandler protocolHandler,
- boolean nowait) throws AMQException
+ boolean nowait, String messageSelector) throws AMQException
{
//fixme prefetch values are not used here. Do we need to have them as parametsrs?
//need to generate a consumer tag on the client so we can exploit the nowait flag
String tag = Integer.toString(_nextTag++);
+ FieldTable arguments = FieldTableFactory.newFieldTable();
+ if (messageSelector != null && !messageSelector.equals(""))
+ {
+ arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+ }
+ if(consumer.isAutoClose())
+ {
+ arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+ if(consumer.isNoConsume())
+ {
+ arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
consumer.setConsumerTag(tag);
// we must register the consumer in the map before we actually start listening
_consumers.put(tag, consumer);
@@ -1080,7 +1125,7 @@
AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
queueName, tag, consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
- consumer.isExclusive(), nowait, null);
+ consumer.isExclusive(), nowait, arguments);
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
@@ -1220,7 +1265,7 @@
{
checkNotClosed();
checkValidTopic(topic);
- AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic)topic, name, _connection);
+ AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
@@ -1247,8 +1292,8 @@
subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1278,8 +1323,8 @@
AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
- _subscriptions.put(name,subscriber);
- _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
+ _subscriptions.put(name, subscriber);
+ _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
return subscriber;
}
@@ -1291,16 +1336,14 @@
public QueueBrowser createBrowser(Queue queue) throws JMSException
{
- checkNotClosed();
- checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return createBrowser(queue, null);
}
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
{
checkNotClosed();
checkValidQueue(queue);
- throw new UnsupportedOperationException("Queue browsing not supported");
+ return new AMQQueueBrowser(this, (AMQQueue) queue,messageSelector);
}
public TemporaryQueue createTemporaryQueue() throws JMSException
@@ -1476,7 +1519,14 @@
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- consumeFromQueue(consumer, queueName, protocolHandler, nowait);
+ try
+ {
+ consumeFromQueue(consumer, queueName, protocolHandler, nowait, consumer.getMessageSelector());
+ }
+ catch (JMSException e) //thrown by getMessageSelector
+ {
+ throw new AMQException(e.getMessage(), e);
+ }
}
/**
@@ -1489,7 +1539,7 @@
{
_consumers.remove(consumer.getConsumerTag());
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
- if(subscriptionName != null)
+ if (subscriptionName != null)
{
_subscriptions.remove(subscriptionName);
}
@@ -1497,7 +1547,7 @@
Destination dest = consumer.getDestination();
synchronized(dest)
{
- if(_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
{
_destinationConsumerCount.remove(dest);
}
@@ -1567,6 +1617,16 @@
_connection.getProtocolHandler().writeFrame(channelFlowFrame);
}
+ public void confirmConsumerCancelled(String consumerTag)
+ {
+ BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
+ if((consumer != null) && (consumer.isAutoClose()))
+ {
+ consumer.closeWhenNoMessages(true);
+ }
+ }
+
+
/*
* I could have combined the last 3 methods, but this way it improves readability
*/
@@ -1576,7 +1636,7 @@
{
throw new javax.jms.InvalidDestinationException("Invalid Topic");
}
- if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+ if ((topic instanceof TemporaryDestination) && ((TemporaryDestination) topic).getSession() != this)
{
throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
}
@@ -1597,4 +1657,5 @@
throw new javax.jms.InvalidDestinationException("Invalid Queue");
}
}
+
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Dec 26 13:04:28 2006
@@ -145,10 +145,19 @@
*/
private Thread _receivingThread;
+ /**
+ * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
+ * on the queue. This is used for queue browsing.
+ */
+ private boolean _autoClose;
+ private boolean _closeWhenNoMessages;
+
+ private boolean _noConsume;
+
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector,
- boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
- int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode)
+ boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable,
+ int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
_connection = connection;
@@ -164,6 +173,8 @@
_exclusive = exclusive;
_acknowledgeMode = acknowledgeMode;
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
+ _autoClose = autoClose;
+ _noConsume = noConsume;
}
public AMQDestination getDestination()
@@ -321,6 +332,10 @@
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = null;
if (l > 0)
{
@@ -350,6 +365,19 @@
}
}
+ private boolean closeOnAutoClose() throws JMSException
+ {
+ if(isAutoClose() && _closeWhenNoMessages && _synchronousQueue.isEmpty())
+ {
+ close(false);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
public Message receiveNoWait() throws JMSException
{
checkPreConditions();
@@ -358,6 +386,10 @@
try
{
+ if(closeOnAutoClose())
+ {
+ return null;
+ }
Object o = _synchronousQueue.poll();
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
@@ -402,22 +434,31 @@
}
}
+
public void close() throws JMSException
{
+ close(true);
+ }
+
+ public void close(boolean sendClose) throws JMSException
+ {
synchronized(_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
{
- final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
-
- try
- {
- _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
- }
- catch (AMQException e)
+ if(sendClose)
{
- _logger.error("Error closing consumer: " + e, e);
- throw new JMSException("Error closing consumer: " + e);
+ final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId, _consumerTag, false);
+
+ try
+ {
+ _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error closing consumer: " + e, e);
+ throw new JMSException("Error closing consumer: " + e);
+ }
}
deregisterConsumer();
@@ -513,6 +554,12 @@
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
{
+ case Session.CLIENT_ACKNOWLEDGE:
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ break;
case Session.DUPS_OK_ACKNOWLEDGE:
if (++_outstanding >= _prefetchHigh)
{
@@ -539,7 +586,14 @@
}
break;
case Session.SESSION_TRANSACTED:
- _lastDeliveryTag = msg.getDeliveryTag();
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ _lastDeliveryTag = msg.getDeliveryTag();
+ }
break;
}
}
@@ -629,5 +683,30 @@
public void clearUnackedMessages()
{
_unacknowledgedDeliveryTags.clear();
+ }
+
+ public boolean isAutoClose()
+ {
+ return _autoClose;
+ }
+
+
+ public boolean isNoConsume()
+ {
+ return _noConsume;
+ }
+
+ public void closeWhenNoMessages(boolean b)
+ {
+ _closeWhenNoMessages = b;
+
+ if(_closeWhenNoMessages
+ && _synchronousQueue.isEmpty()
+ && _receiving.get()
+ && _messageListener != null)
+ {
+ _receivingThread.interrupt();
+ }
+
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Tue Dec 26 13:04:28 2006
@@ -20,16 +20,16 @@
*/
package org.apache.qpid.client;
-import java.util.Enumeration;
+import org.apache.qpid.common.QpidProperties;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
+import java.util.Enumeration;
public class QpidConnectionMetaData implements ConnectionMetaData
{
-
QpidConnectionMetaData(AMQConnection conn)
{
}
@@ -46,7 +46,7 @@
public String getJMSProviderName() throws JMSException
{
- return "Apache Qpid";
+ return "Apache " + QpidProperties.getProductName();
}
public String getJMSVersion() throws JMSException
@@ -71,8 +71,8 @@
public String getProviderVersion() throws JMSException
{
- return "QPID (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
- + getProtocolVersion() + "] )";
+ return QpidProperties.getProductName() + " (Client: [" + getClientVersion() + "] ; Broker [" + getBrokerVersion() + "] ; Protocol: [ "
+ + getProtocolVersion() + "] )";
}
private String getProtocolVersion()
@@ -89,8 +89,7 @@
public String getClientVersion()
{
- // TODO - get client build version from properties file or similar
- return "<unknown>";
+ return QpidProperties.getBuildVersion();
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.protocol.AMQConstant;
@@ -46,7 +47,7 @@
public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
- _logger.debug("ChannelClose method received");
+ _logger.debug("ChannelClose method received");
ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
int errorCode = method.replyCode;
@@ -65,17 +66,21 @@
{
throw new AMQNoConsumersException("Error: " + reason, null);
}
+ else if (errorCode == AMQConstant.NO_ROUTE.getCode())
+ {
+ throw new AMQNoRouteException("Error: " + reason, null);
+ }
+ else if (errorCode == AMQConstant.INVALID_SELECTOR.getCode())
+ {
+ _logger.info("Broker responded with Invalid Selector.");
+
+ throw new AMQInvalidSelectorException(reason);
+ }
else
{
- if (errorCode == AMQConstant.NO_ROUTE.getCode())
- {
- throw new AMQNoRouteException("Error: " + reason, null);
- }
- else
- {
- throw new AMQChannelClosedException(errorCode, "Error: " + reason);
- }
+ throw new AMQChannelClosedException(errorCode, "Error: " + reason);
}
+
}
evt.getProtocolSession().channelClosed(evt.getChannelId(), errorCode, reason);
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.client.protocol.AMQMethodEvent;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -119,16 +121,22 @@
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put("instance", ps.getClientID());
- clientProperties.put("product", "Qpid");
- clientProperties.put("version", "1.0");
- clientProperties.put("platform", getFullSystemInfo());
+
+ clientProperties.put(ClientProperties.instance.toString(), ps.getClientID());
+ _log.info("Product name: " + QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.product.toString(), QpidProperties.getProductName());
+ clientProperties.put(ClientProperties.version.toString(), QpidProperties.getReleaseVersion());
+ clientProperties.put(ClientProperties.platform.toString(), getFullSystemInfo());
ps.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(), clientProperties, mechanism,
saslResponse, selectedLocale));
}
catch (UnsupportedEncodingException e)
{
throw new AMQException(_log, "Unable to decode data: " + e, e);
+ }
+ catch (Throwable t)
+ {
+ _log.error("Error: " + t, t);
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Tue Dec 26 13:04:28 2006
@@ -26,7 +26,8 @@
import org.apache.qpid.url.BindingURL;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.*;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.BasicMessageConsumer;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
@@ -46,7 +47,8 @@
protected ByteBuffer _data;
private boolean _readableProperties = false;
- private boolean _readableMessage = false;
+ protected boolean _readableMessage = false;
+ protected boolean _changedData;
private Destination _destination;
private BasicMessageConsumer _consumer;
@@ -60,6 +62,7 @@
}
_readableProperties = false;
_readableMessage = (data != null);
+ _changedData = (data == null);
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) throws AMQException
@@ -521,16 +524,16 @@
return !_readableMessage;
}
- public void reset()
+ public void reset()
{
- if (_readableMessage)
+ if (!_changedData)
{
_data.rewind();
}
else
{
_data.flip();
- _readableMessage = true;
+ _changedData = false;
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Tue Dec 26 13:04:28 2006
@@ -59,6 +59,12 @@
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -226,48 +232,56 @@
public void writeBoolean(boolean b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b ? (byte) 1 : (byte) 0);
}
public void writeByte(byte b) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.put(b);
}
public void writeShort(short i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putShort(i);
}
public void writeChar(char c) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putChar(c);
}
public void writeInt(int i) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putInt(i);
}
public void writeLong(long l) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putLong(l);
}
public void writeFloat(float v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putFloat(v);
}
public void writeDouble(double v) throws JMSException
{
checkWritable();
+ _changedData = true;
_data.putDouble(v);
}
@@ -281,7 +295,7 @@
_data.putShort((short)encodedString.limit());
_data.put(encodedString);
-
+ _changedData = true;
//_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must add the null terminator manually
//_data.put((byte)0);
@@ -298,12 +312,14 @@
{
checkWritable();
_data.put(bytes);
+ _changedData = true;
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
checkWritable();
_data.put(bytes, offset, length);
+ _changedData = true;
}
public void writeObject(Object object) throws JMSException
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Tue Dec 26 13:04:28 2006
@@ -112,7 +112,7 @@
}
}
-
+
public Serializable getObject() throws JMSException
{
ObjectInputStream in = null;
@@ -123,18 +123,18 @@
try
{
- _data.rewind();
+ _data.rewind();
in = new ObjectInputStream(_data.asInputStream());
return (Serializable) in.readObject();
}
catch (IOException e)
- {
- e.printStackTrace();
- throw new MessageFormatException("Could not deserialize message: " + e);
+ {
+ e.printStackTrace();
+ throw new MessageFormatException("Could not deserialize message: " + e);
}
catch (ClassNotFoundException e)
{
- e.printStackTrace();
+ e.printStackTrace();
throw new MessageFormatException("Could not deserialize message: " + e);
}
finally
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Tue Dec 26 13:04:28 2006
@@ -86,6 +86,12 @@
super(messageNbr, contentHeader, data);
}
+ public void reset()
+ {
+ super.reset();
+ _readableMessage = true;
+ }
+
public String getMimeType()
{
return MIME_TYPE;
@@ -103,6 +109,7 @@
{
checkWritable();
_data.put(type);
+ _changedData = true;
}
public boolean readBoolean() throws JMSException
@@ -693,7 +700,7 @@
{
_data.putString(string, Charset.forName("UTF-8").newEncoder());
// we must write the null terminator ourselves
- _data.put((byte)0);
+ _data.put((byte) 0);
}
catch (CharacterCodingException e)
{
@@ -706,7 +713,7 @@
public void writeBytes(byte[] bytes) throws JMSException
{
- writeBytes(bytes, 0, bytes == null?0:bytes.length);
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Tue Dec 26 13:04:28 2006
@@ -117,6 +117,7 @@
{
_data.put(text.getBytes(getJmsContentHeaderProperties().getEncoding()));
}
+ _changedData=true;
}
_decodedValue = text;
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Dec 26 13:04:28 2006
@@ -406,4 +406,12 @@
HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay));
}
}
+
+ public void confirmConsumerCancelled(int channelId, String consumerTag)
+ {
+ final Integer chId = channelId;
+ final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
+
+ session.confirmConsumerCancelled(consumerTag);
+ }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Tue Dec 26 13:04:28 2006
@@ -110,7 +110,7 @@
}
else
{
- throw new AMQException("Woken up due to exception", _error); // FIXME: This will wrap FailoverException and prevent it being caught.
+ throw new AMQException("Woken up due to " + _error.getClass(), _error); // FIXME: This will wrap FailoverException and prevent it being caught.
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Dec 26 13:04:28 2006
@@ -103,6 +103,7 @@
frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
+ frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java Tue Dec 26 13:04:28 2006
@@ -59,7 +59,7 @@
// once more testing of the performance of the simple allocator has been done
if (!Boolean.getBoolean("amqj.enablePooledAllocator"))
{
- _logger.warn("Using SimpleByteBufferAllocator");
+ _logger.info("Using SimpleByteBufferAllocator");
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Tue Dec 26 13:04:28 2006
@@ -40,11 +40,15 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
public class PropertiesFileInitialContextFactory implements InitialContextFactory
{
- protected final Logger _logger = Logger.getLogger(getClass());
+ protected final Logger _logger = Logger.getLogger(PropertiesFileInitialContextFactory.class);
private String CONNECTION_FACTORY_PREFIX = "connectionfactory.";
private String DESTINATION_PREFIX = "destination.";
@@ -54,6 +58,41 @@
public Context getInitialContext(Hashtable environment) throws NamingException
{
Map data = new ConcurrentHashMap();
+
+ try
+ {
+
+ String file = null;
+ if (environment.contains(Context.PROVIDER_URL))
+ {
+ file = (String) environment.get(Context.PROVIDER_URL);
+ }
+ else
+ {
+ file = System.getProperty(Context.PROVIDER_URL);
+ }
+
+ if (file != null)
+ {
+ _logger.info("Loading Properties from:" + file);
+ //Load the properties specified
+ Properties p = new Properties();
+
+ p.load(new BufferedInputStream(new FileInputStream(file)));
+
+ environment.putAll(p);
+ _logger.info("Loaded Context Properties:" + environment.toString());
+ }
+ else
+ {
+ _logger.warn("No Provider URL specified.");
+ }
+ }
+ catch (IOException ioe)
+ {
+ _logger.warn("Unable to load property file specified in Provider_URL:" +
+ environment.get(Context.PROVIDER_URL));
+ }
createConnectionFactories(data, environment);
Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java Tue Dec 26 13:04:28 2006
@@ -29,6 +29,7 @@
import javax.jms.*;
import javax.jms.MessageConsumer;
import javax.jms.Session;
+import javax.jms.Message;
import java.util.Hashtable;
import java.io.File;
import java.io.FilenameFilter;
Modified: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Dec 26 13:04:28 2006
@@ -116,7 +116,9 @@
m.setIntProperty("Int", (int) Integer.MAX_VALUE);
m.setJMSCorrelationID("Correlation");
- m.setJMSPriority(100);
+ //fixme the m.setJMSMessage has no effect
+ producer.setPriority(8);
+ m.setJMSPriority(3);
// Queue
Queue q;
@@ -182,10 +184,8 @@
(int) Integer.MAX_VALUE, m.getIntProperty("Int"));
Assert.assertEquals("Check CorrelationID properties are correctly transported",
"Correlation", m.getJMSCorrelationID());
-
- _logger.warn("getJMSPriority not being verified.");
-// Assert.assertEquals("Check Priority properties are correctly transported",
-// 100, m.getJMSPriority());
+ Assert.assertEquals("Check Priority properties are correctly transported",
+ 8, m.getJMSPriority());
// Queue
Assert.assertEquals("Check ReplyTo properties are correctly transported",
Modified: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Tue Dec 26 13:04:28 2006
@@ -121,7 +121,7 @@
{
if (_connection != null)
{
- System.out.println(">>>>>>>>>>>>>>.. closing");
+ _log.info(">>>>>>>>>>>>>>.. closing");
_connection.close();
}
}
@@ -137,7 +137,7 @@
{
public void onException(JMSException jmsException)
{
- _log.error("onException - ", jmsException);
+ _log.warn("onException - "+jmsException.getMessage());
}
});
Modified: incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java Tue Dec 26 13:04:28 2006
@@ -20,6 +20,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -77,6 +79,11 @@
public int getWeight()
{
return ClusteredSubscriptionManager.this.getWeight();
+ }
+
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
}
public boolean hasActiveSubscribers()
Modified: incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java Tue Dec 26 13:04:28 2006
@@ -18,12 +18,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -41,11 +41,24 @@
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -56,9 +69,9 @@
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -91,7 +104,7 @@
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
Modified: incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=490372&r1=490371&r2=490372
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Tue Dec 26 13:04:28 2006
@@ -22,6 +22,9 @@
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -73,6 +76,11 @@
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -85,9 +93,49 @@
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
+ }
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public boolean isAutoClose()
+ {
+ return false;
+ }
+
+ public void close()
+ {
+ //no-op
+ }
+
+ public boolean isBrowser()
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
}
}