You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/19 16:42:54 UTC
svn commit: r530432 - in /incubator/qpid/branches/M2/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/queue/
cluster/src/main/java/org/apache/qpid/server/queue/
systests/src/main/java/org/apache/qpid/server...
Author: ritchiem
Date: Thu Apr 19 07:42:53 2007
New Revision: 530432
URL: http://svn.apache.org/viewvc?view=rev&rev=530432
Log:
QPID-459 - NoLocal broken when messages already exist on queue from consumer. With test.
AMQChannel remove comment around setPublisher - this is used by noLocal implementation.
Subscription - rename of hasFilters to filtersMessages
AMQQueue/RemoteSubscriptionImpl/SubscriptionTestHelper/SubscriptionSet - rename of hasFilters to filtersMessages
SubscriptionImpl - rename of hasFilters to filtersMessages and changes to include noLocal in that check.
TopicSessionTest - Additional testing for NoLocal to ensure.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Apr 19 07:42:53 2007
@@ -197,7 +197,6 @@
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
_txnContext);
- // TODO: used in clustering only I think (RG)
_currentMessage.setPublisher(publisher);
}
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Apr 19 07:42:53 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.server.queue;
import java.text.MessageFormat;
-import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -237,8 +236,10 @@
/**
* Returns messages within the given range of message Ids
+ *
* @param fromMessageId
* @param toMessageId
+ *
* @return List of messages
*/
public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
@@ -253,6 +254,7 @@
/**
* @param messageId
+ *
* @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
*/
public AMQMessage getMessageOnTheQueue(long messageId)
@@ -267,10 +269,10 @@
/**
* moves messages from this queue to another queue. to do this the approach is following- - setup the queue for
- * moving messages (stop the async delivery) - get all the messages available in the given message
- * id range - setup the other queue for moving messages (stop the async delivery) - send these
- * available messages to the other queue (enqueue in other queue) - Once sending to other Queue is successful,
- * remove messages from this queue - remove locks from both queues and start async delivery
+ * moving messages (stop the async delivery) - get all the messages available in the given message id range - setup
+ * the other queue for moving messages (stop the async delivery) - send these available messages to the other queue
+ * (enqueue in other queue) - Once sending to other Queue is successful, remove messages from this queue - remove
+ * locks from both queues and start async delivery
*
* @param fromMessageId
* @param toMessageId
@@ -442,7 +444,7 @@
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
- if (subscription.hasFilters())
+ if (subscription.filtersMessages())
{
if (_deliveryMgr.hasQueuedMessages())
{
@@ -641,7 +643,7 @@
{
_totalMessagesReceived.incrementAndGet();
}
-
+
try
{
_managedObject.checkForNotification(msg);
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Thu Apr 19 07:42:53 2007
@@ -32,7 +32,7 @@
void queueDeleted(AMQQueue queue) throws AMQException;
- boolean hasFilters();
+ boolean filtersMessages();
boolean hasInterest(AMQMessage msg);
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Thu Apr 19 07:42:53 2007
@@ -158,7 +158,7 @@
}
- if (_filters != null)
+ if (filtersMessages())
{
_messages = new ConcurrentLinkedMessageQueueAtomicSize<AMQMessage>();
}
@@ -346,9 +346,9 @@
channel.queueDeleted(queue);
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
- return _filters != null;
+ return _filters != null || _noLocal;
}
public boolean hasInterest(AMQMessage msg)
@@ -363,7 +363,10 @@
// return false;
}
- if (_noLocal)
+ final AMQProtocolSession publisher = msg.getPublisher();
+
+ //todo - client id should be recoreded and this test removed but handled below
+ if (_noLocal && publisher != null)
{
// We don't want local messages so check to see if message is one we sent
Object localInstance;
@@ -372,8 +375,9 @@
if ((protocolSession.getClientProperties() != null) &&
(localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
- if ((msg.getPublisher().getClientProperties() != null) &&
- (msgInstance = msg.getPublisher().getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
+
+ if ((publisher.getClientProperties() != null) &&
+ (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null)
{
if (localInstance == msgInstance || localInstance.equals(msgInstance))
{
@@ -388,8 +392,11 @@
}
else
{
+
localInstance = protocolSession.getClientIdentifier();
- msgInstance = msg.getPublisher().getClientIdentifier();
+ //todo - client id should be recoreded and this test removed but handled here
+
+ msgInstance = publisher.getClientIdentifier();
if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance)))
{
if (_logger.isTraceEnabled())
@@ -399,7 +406,6 @@
}
return false;
}
-
}
@@ -623,7 +629,7 @@
return _resendQueue;
}
- if (_filters != null)
+ if (filtersMessages())
{
if (isAutoClose())
{
Modified: incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Thu Apr 19 07:42:53 2007
@@ -157,7 +157,7 @@
//FIXME the queue could be full of sent messages.
// Either need to clean all PDQs after sending a message
// OR have a clean up thread that runs the PDQs expunging the messages.
- if (!subscription.hasFilters() || subscription.getPreDeliveryQueue().isEmpty())
+ if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty())
{
return subscription;
}
Modified: incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original)
+++ incubator/qpid/branches/M2/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Thu Apr 19 07:42:53 2007
@@ -102,7 +102,7 @@
}
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}
Modified: incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=530432&r1=530431&r2=530432
==============================================================================
--- incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original)
+++ incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Thu Apr 19 07:42:53 2007
@@ -87,7 +87,7 @@
{
}
- public boolean hasFilters()
+ public boolean filtersMessages()
{
return false;
}