You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/03/30 18:02:46 UTC
svn commit: r1087001 - in /qpid/branches/0.5.x-dev/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/j...
Author: robbie
Date: Wed Mar 30 16:02:45 2011
New Revision: 1087001
URL: http://svn.apache.org/viewvc?rev=1087001&view=rev
Log:
QPID-3167: add a unit test of SimpleAMQQueue#processQueue to check delivery when subscriptions with unique selectors are in use
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
Modified: qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Wed Mar 30 16:02:45 2011
@@ -210,9 +210,9 @@ public class DiagnosticExchange extends
Long value = new Long(SizeOf.getUsedMemory());
AMQShortString key = new AMQShortString("memory");
- FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).getHeaders();
+ FieldTable headers = ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).getHeaders();
headers.put(key, value);
- ((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
+ ((BasicContentHeaderProperties)payload.getContentHeaderBody().getProperties()).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new AMQShortString("diagnosticqueue"));
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Mar 30 16:02:45 2011
@@ -224,7 +224,7 @@ public class AMQChannel
finally
{
long bodySize = _currentMessage.getContentHeaderBody().bodySize;
- long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().properties).getTimestamp();
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeaderBody().getProperties()).getTimestamp();
_session.registerMessageReceived(bodySize, timestamp);
// callback to allow the context to do any post message processing
// primary use is to allow message return processing in the non-tx case
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Mar 30 16:02:45 2011
@@ -351,7 +351,7 @@ public class HeadersExchange extends Abs
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
//but these are not yet implemented.
- return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+ return ((BasicContentHeaderProperties) contentHeaderFrame.getProperties()).getHeaders();
}
protected ExchangeMBean createMBean() throws AMQException
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Wed Mar 30 16:02:45 2011
@@ -110,7 +110,7 @@ public class PropertyExpression<E extend
{
CommonContentHeaderProperties _properties =
- (CommonContentHeaderProperties) message.getContentHeaderBody().properties;
+ (CommonContentHeaderProperties) message.getContentHeaderBody().getProperties();
if (_logger.isDebugEnabled())
{
@@ -165,7 +165,7 @@ public class PropertyExpression<E extend
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString replyTo = _properties.getReplyTo();
return (replyTo == null) ? null : replyTo.toString();
@@ -180,7 +180,7 @@ public class PropertyExpression<E extend
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString type = _properties.getType();
return (type == null) ? null : type.toString();
@@ -208,7 +208,7 @@ public class PropertyExpression<E extend
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
return (int) _properties.getPriority();
}
@@ -221,7 +221,7 @@ public class PropertyExpression<E extend
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString messageId = _properties.getMessageId();
return (messageId == null) ? null : messageId;
@@ -235,7 +235,7 @@ public class PropertyExpression<E extend
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
return _properties.getTimestamp();
}
@@ -247,7 +247,7 @@ public class PropertyExpression<E extend
{
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
AMQShortString correlationId = _properties.getCorrelationId();
return (correlationId == null) ? null : correlationId.toString();
@@ -261,7 +261,7 @@ public class PropertyExpression<E extend
CommonContentHeaderProperties _properties =
(CommonContentHeaderProperties)
- message.getContentHeaderBody().properties;
+ message.getContentHeaderBody().getProperties();
return _properties.getExpiration();
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Wed Mar 30 16:02:45 2011
@@ -410,7 +410,7 @@ public class AMQQueueMBean extends AMQMa
{
// Create header attributes list
CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
+ (CommonContentHeaderProperties) msg.getContentHeaderBody().getProperties();
String mimeType = null, encoding = null;
if (headerProperties != null)
{
@@ -493,7 +493,7 @@ public class AMQQueueMBean extends AMQMa
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Wed Mar 30 16:02:45 2011
@@ -59,7 +59,7 @@ public class ConflationQueueList extends
try
{
- Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().properties).getHeaders().get(_conflationKey);
+ Object value = ((BasicContentHeaderProperties)message.getContentHeaderBody().getProperties()).getHeaders().get(_conflationKey);
if(value != null)
{
latestValueReference = _latestValuesMap.get(value);
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Wed Mar 30 16:02:45 2011
@@ -94,9 +94,9 @@ public class IncomingMessage implements
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
if (SYNCHED_CLOCKS)
{
@@ -176,8 +176,8 @@ public class IncomingMessage implements
// now that it has all been received, before we attempt delivery
_txnContext.messageFullyReceived(isPersistent());
- AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
+ AMQShortString userID = getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties ?
+ ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getUserId() : null;
if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
{
@@ -280,8 +280,8 @@ public class IncomingMessage implements
public boolean isPersistent()
{
- return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() ==
+ return getContentHeaderBody().getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeaderBody().getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed Mar 30 16:02:45 2011
@@ -57,7 +57,7 @@ public class PriorityQueueList implement
{
try
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
+ int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().getProperties()))).getPriority() - _priorityOffset;
if(index >= _priorities)
{
index = _priorities-1;
Added: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1087001&view=auto
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (added)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Wed Mar 30 16:02:45 2011
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+
+public class QueueRunner implements ReadWriteRunnable
+{
+ private static final Logger _logger = Logger.getLogger(QueueRunner.class);
+
+ private String _name;
+ private SimpleAMQQueue _queue;
+
+ public QueueRunner(SimpleAMQQueue queue, long count)
+ {
+ _queue = queue;
+ _name = "QueueRunner-" + count + "-" + _queue.getLogActor();
+ }
+
+ public void run()
+ {
+ String originalName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(_name);
+ CurrentActor.set(_queue.getLogActor());
+
+ _queue.processQueue(this);
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ Thread.currentThread().setName(originalName);
+ }
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+
+ public String toString()
+ {
+ return _name;
+ }
+}
\ No newline at end of file
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Mar 30 16:02:45 2011
@@ -1230,10 +1230,9 @@ public class SimpleAMQQueue implements A
}
}
-
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1246,52 +1245,6 @@ public class SimpleAMQQueue implements A
_asyncDelivery.execute(new SubFlushRunner(sub));
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
private class SubFlushRunner implements ReadWriteRunnable
{
private final Subscription _sub;
@@ -1529,7 +1482,7 @@ public class SimpleAMQQueue implements A
* @param runner the Runner to schedule
* @throws AMQException
*/
- private void processQueue(Runnable runner) throws AMQException
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
@@ -1897,4 +1850,9 @@ public class SimpleAMQQueue implements A
throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Wed Mar 30 16:02:45 2011
@@ -120,8 +120,8 @@ public class TransientMessageData
public boolean isPersistent()
{
- return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() ==
+ return _contentHeaderBody.getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java Wed Mar 30 16:02:45 2011
@@ -361,7 +361,7 @@ public class Show extends AbstractComman
try
{
- headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().properties);
+ headers = ((BasicContentHeaderProperties) msg.getContentHeaderBody().getProperties());
}
catch (AMQException e)
{
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Wed Mar 30 16:02:45 2011
@@ -227,7 +227,7 @@ public class AbstractHeadersExchangeTest
static ContentHeaderBody getContentHeader(FieldTable headers)
{
ContentHeaderBody header = new ContentHeaderBody();
- header.properties = getProperties(headers);
+ header.setProperties(getProperties(headers));
return header;
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java Wed Mar 30 16:02:45 2011
@@ -95,7 +95,7 @@ public class AMQPriorityQueueTest extend
AMQMessage msg = super.createMessage(id);
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
props.setPriority(i);
- msg.getContentHeaderBody().properties = props;
+ msg.getContentHeaderBody().setProperties(props);
return msg;
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Wed Mar 30 16:02:45 2011
@@ -448,8 +448,8 @@ public class AMQQueueMBeanTest extends T
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) (persistent ? 2 : 1));
IncomingMessage msg = new IncomingMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, _protocolSession);
msg.setContentHeaderBody(contentHeaderBody);
return msg;
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java Wed Mar 30 16:02:45 2011
@@ -144,7 +144,7 @@ public class AckTest extends TestCase
//This is DeliveryMode.PERSISTENT
b.setDeliveryMode((byte) 2);
ContentHeaderBody cb = new ContentHeaderBody();
- cb.properties = b;
+ cb.setProperties(b);
msg.setContentHeaderBody(cb);
}
else
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Wed Mar 30 16:02:45 2011
@@ -31,19 +31,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -405,8 +401,8 @@ public class SimpleAMQQueueTest extends
NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null);
IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null);
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
- contentHeaderBody.properties = new BasicContentHeaderProperties();
- ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+ contentHeaderBody.setProperties(new BasicContentHeaderProperties());
+ ((BasicContentHeaderProperties) contentHeaderBody.getProperties()).setDeliveryMode((byte) 2);
msg.setContentHeaderBody(contentHeaderBody);
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
@@ -432,6 +428,112 @@ public class SimpleAMQQueueTest extends
assertNull(data);
}
+ /**
+ * processQueue() is used when asynchronously delivering messages to
+ * subscriptions which could not be delivered immediately during the
+ * enqueue() operation.
+ *
+ * A defect within the method would mean that delivery of these messages may
+ * not occur should the Runner stop before all messages have been processed.
+ * Such a defect was discovered when Selectors were used such that one and
+ * only one subscription can/will accept any given messages, but multiple
+ * subscriptions are present, and one of the earlier subscriptions receives
+ * more messages than the others.
+ *
+ * This test is to validate that the processQueue() method is able to
+ * correctly deliver all of the messages present for asynchronous delivery
+ * to subscriptions in such a scenario.
+ */
+ public void testProcessQueueWithUniqueSelectors() throws Exception
+ {
+ StoreContext testContext = new StoreContext();
+ TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString(
+ "testQueue"), false, new AMQShortString("testOwner"), false,
+ _virtualHost, factory)
+ {
+ @Override
+ public void deliverAsync(Subscription sub)
+ {
+ // do nothing, i.e prevent deliveries by the SubFlushRunner
+ // when registering the new subscriptions
+ }
+ };
+
+ // retrieve the QueueEntryList the queue creates and insert the test
+ // messages, thus avoiding straight-through delivery attempts during
+ //enqueue() process.
+ QueueEntryList list = factory.getQueueEntryList();
+ assertNotNull("QueueEntryList should have been created", list);
+
+ QueueEntry msg1 = list.add(createMessage(1L), testContext);
+ QueueEntry msg2 = list.add(createMessage(2L), testContext);
+ QueueEntry msg3 = list.add(createMessage(3L), testContext);
+ QueueEntry msg4 = list.add(createMessage(4L), testContext);
+ QueueEntry msg5 = list.add(createMessage(5L), testContext);
+
+ // Create lists of the entries each subscription should be interested
+ // in.Bias over 50% of the messages to the first subscription so that
+ // the later subscriptions reject them and report being done before
+ // the first subscription as the processQueue method proceeds.
+ List<QueueEntry> msgListSub1 = createEntriesList(msg1, msg2, msg3);
+ List<QueueEntry> msgListSub2 = createEntriesList(msg4);
+ List<QueueEntry> msgListSub3 = createEntriesList(msg5);
+
+ MockSubscription sub1 = new MockSubscription(msgListSub1);
+ MockSubscription sub2 = new MockSubscription(msgListSub2);
+ MockSubscription sub3 = new MockSubscription(msgListSub3);
+
+ // register the subscriptions
+ testQueue.registerSubscription(sub1, false);
+ testQueue.registerSubscription(sub2, false);
+ testQueue.registerSubscription(sub3, false);
+
+ //check that no messages have been delivered to the
+ //subscriptions during registration
+ assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
+ assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
+
+ // call processQueue to deliver the messages
+ testQueue.processQueue(new QueueRunner(testQueue, 1)
+ {
+ @Override
+ public void run()
+ {
+ // we dont actually want/need this runner to do any work
+ // because we we are already doing it!
+ }
+ });
+
+ // check expected messages delivered to correct consumers
+ verifyRecievedMessages(msgListSub1, sub1.getMessages());
+ verifyRecievedMessages(msgListSub2, sub2.getMessages());
+ verifyRecievedMessages(msgListSub3, sub3.getMessages());
+ }
+
+ private List<QueueEntry> createEntriesList(QueueEntry... entries)
+ {
+ ArrayList<QueueEntry> entriesList = new ArrayList<QueueEntry>();
+ for (QueueEntry entry : entries)
+ {
+ entriesList.add(entry);
+ }
+ return entriesList;
+ }
+
+ private void verifyRecievedMessages(List<QueueEntry> expected,
+ List<QueueEntry> delivered)
+ {
+ assertEquals("Consumer did not receive the expected number of messages",
+ expected.size(), delivered.size());
+
+ for (QueueEntry msg : expected)
+ {
+ assertTrue("Consumer did not recieve msg: "
+ + msg.getMessage().getMessageId(), delivered.contains(msg));
+ }
+ }
// FIXME: move this to somewhere useful
private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody)
@@ -495,4 +597,20 @@ public class SimpleAMQQueueTest extends
AMQMessage messageA = new TestMessage(id, id, info, new StoreContext());
return messageA;
}
+
+ class TestSimpleQueueEntryListFactory implements QueueEntryListFactory
+ {
+ QueueEntryList _list;
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ _list = new SimpleQueueEntryList(queue);
+ return _list;
+ }
+
+ public QueueEntryList getQueueEntryList()
+ {
+ return _list;
+ }
+ }
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Wed Mar 30 16:02:45 2011
@@ -366,7 +366,7 @@ public class MessageStoreTest extends Te
headerBody.classId = BasicConsumeBodyImpl.CLASS_ID;
headerBody.bodySize = 0;
- headerBody.properties = properties;
+ headerBody.setProperties(properties);
try
{
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java Wed Mar 30 16:02:45 2011
@@ -105,7 +105,7 @@ public class TestReferenceCounting exten
ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
bchp.setDeliveryMode((byte)2);
- chb.properties = bchp;
+ chb.setProperties(bchp);
return chb;
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Wed Mar 30 16:02:45 2011
@@ -25,12 +25,12 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -46,11 +46,21 @@ public class MockSubscription implements
private State _state = State.ACTIVE;
private ArrayList<QueueEntry> messages = new ArrayList<QueueEntry>();
private final Lock _stateChangeLock = new ReentrantLock();
+ private List<QueueEntry> _acceptEntries = null;
private static final AtomicLong idGenerator = new AtomicLong(0);
// Create a simple ID that increments for ever new Subscription
private final long _subscriptionID = idGenerator.getAndIncrement();
+ public MockSubscription()
+ {
+ }
+
+ public MockSubscription(List<QueueEntry> acceptEntries)
+ {
+ _acceptEntries = acceptEntries;
+ }
+
public void close()
{
_closed = true;
@@ -101,8 +111,15 @@ public class MockSubscription implements
_stateChangeLock.lock();
}
- public boolean hasInterest(QueueEntry msg)
+ public boolean hasInterest(QueueEntry entry)
{
+ if(_acceptEntries != null)
+ {
+ //simulate selector behaviour, only signal
+ //interest in the dictated queue entries
+ return _acceptEntries.contains(entry);
+ }
+
return true;
}
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java Wed Mar 30 16:02:45 2011
@@ -193,7 +193,7 @@ public class InternalBrokerBaseCase exte
//Make Message Persistent
properties.setDeliveryMode((byte) 2);
- _headerBody.properties = properties;
+ _headerBody.setProperties(properties);
channel.publishContentHeader(_headerBody);
}
Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Wed Mar 30 16:02:45 2011
@@ -99,7 +99,7 @@ public abstract class AbstractJMSMessage
}
AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
- (BasicContentHeaderProperties) contentHeader.properties,
+ (BasicContentHeaderProperties) contentHeader.getProperties(),
exchange, routingKey);
return createMessage(delegate, data);
Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Mar 30 16:02:45 2011
@@ -103,7 +103,7 @@ public class MessageFactoryRegistry
AMQShortString routingKey, ContentHeaderBody contentHeader, List bodies)
throws AMQException, JMSException
{
- BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.properties;
+ BasicContentHeaderProperties properties = (BasicContentHeaderProperties) contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
Modified: qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java Wed Mar 30 16:02:45 2011
@@ -258,7 +258,7 @@ public class BasicDeliverTest
static ContentHeaderBody createContentHeaderBody()
{
ContentHeaderBody body = new ContentHeaderBody();
- body.properties = new BasicContentHeaderProperties();
+ body.setProperties(new BasicContentHeaderProperties());
body.weight = 1;
body.classId = 6;
return body;
Modified: qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=1087001&r1=1087000&r2=1087001&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ qpid/branches/0.5.x-dev/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Wed Mar 30 16:02:45 2011
@@ -36,7 +36,7 @@ public class ContentHeaderBody implement
public long bodySize;
/** must never be null */
- public ContentHeaderProperties properties;
+ private ContentHeaderProperties properties;
public ContentHeaderBody()
{
@@ -128,4 +128,14 @@ public class ContentHeaderBody implement
{
return new AMQFrame(channelId, body);
}
+
+ public ContentHeaderProperties getProperties()
+ {
+ return properties;
+ }
+
+ public void setProperties(ContentHeaderProperties props)
+ {
+ properties = props;
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org