You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/09 13:27:15 UTC
[1/3] qpid-broker-j git commit: QPID-7958: [Java Broker] Ensure
SystemMessageSource messages are deleted even when they are not consumed
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 3eb2426b0 -> bd5b95147
QPID-7958: [Java Broker] Ensure SystemMessageSource messages are deleted even when they are not consumed
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0a447b66
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0a447b66
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0a447b66
Branch: refs/heads/master
Commit: 0a447b66e58a7d63796f88348d62c6131024170e
Parents: 2967b68
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 12:24:06 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 13:32:31 2017 +0100
----------------------------------------------------------------------
.../qpid/server/virtualhost/AbstractSystemMessageSource.java | 7 ++++++-
.../qpid/server/management/amqp/ManagementNodeConsumer.java | 1 +
.../qpid/server/management/amqp/ManagementResponse.java | 1 +
3 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a447b66/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 3d6d009..3693c70 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -193,6 +193,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
@Override
public void close()
{
+ _queue.forEach(PropertiesMessageInstance::delete);
_consumers.remove(this);
}
@@ -400,12 +401,16 @@ public abstract class AbstractSystemMessageSource implements MessageSource
@Override
public void release(MessageInstanceConsumer<?> consumer)
{
- release();
+ if (isAcquiredBy(consumer))
+ {
+ release();
+ }
}
@Override
public void delete()
{
+ _messageReference.release();
_isDeleted = true;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a447b66/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 10e9491..7035210 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -127,6 +127,7 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
@Override
public void close()
{
+ _queue.forEach(ManagementResponse::delete);
_managementNode.unregisterConsumer(this);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a447b66/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index ad830ba..8e9e5c8 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -219,6 +219,7 @@ class ManagementResponse implements MessageInstance
@Override
public void delete()
{
+ _messageReference.release();
_isDeleted = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-7958: [Java Broker] Ensure a
message reference is always taken for SystemMessageSource messages
Posted by lq...@apache.org.
QPID-7958: [Java Broker] Ensure a message reference is always taken for SystemMessageSource messages
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2967b682
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2967b682
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2967b682
Branch: refs/heads/master
Commit: 2967b682326ae7f0f862a0086ae54ea733c5cc23
Parents: 3eb2426
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 12:08:21 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 13:32:31 2017 +0100
----------------------------------------------------------------------
.../AbstractSystemMessageSource.java | 30 ++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2967b682/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 24e19f5..3d6d009 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -34,13 +34,14 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -115,15 +116,14 @@ public abstract class AbstractSystemMessageSource implements MessageSource
return MessageConversionExceptionHandlingPolicy.CLOSE;
}
- protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>
+ protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, TransactionLogResource
{
private final List<PropertiesMessageInstance> _queue =
Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
private final T _target;
private final String _name;
- private final Object _identifier = new Object();
-
+ private final UUID _identifier = UUID.randomUUID();
public Consumer(final String consumerName, T target)
{
@@ -161,7 +161,9 @@ public abstract class AbstractSystemMessageSource implements MessageSource
if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
{
_queue.remove(0);
- return new MessageContainer(propertiesMessageInstance, null, false);
+ return new MessageContainer(propertiesMessageInstance,
+ propertiesMessageInstance.getMessageReference(),
+ false);
}
}
return null;
@@ -200,6 +202,18 @@ public abstract class AbstractSystemMessageSource implements MessageSource
return _name;
}
+ @Override
+ public UUID getId()
+ {
+ return _identifier;
+ }
+
+ @Override
+ public MessageDurability getMessageDurability()
+ {
+ return MessageDurability.NEVER;
+ }
+
public void send(final InternalMessage response)
{
_queue.add(new PropertiesMessageInstance(this, response));
@@ -210,6 +224,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
class PropertiesMessageInstance implements MessageInstance
{
private final Consumer _consumer;
+ private final MessageReference _messageReference;
private int _deliveryCount;
private boolean _isRedelivered;
private boolean _isDelivered;
@@ -220,6 +235,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
{
_consumer = consumer;
_message = message;
+ _messageReference = message.newReference(consumer);
}
@Override
@@ -429,5 +445,9 @@ public abstract class AbstractSystemMessageSource implements MessageSource
return AbstractSystemMessageSource.this;
}
+ public MessageReference getMessageReference()
+ {
+ return _messageReference;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-7958: [Java Broker] [AMQP
Management] Dispose converted message on the incoming request path
Posted by lq...@apache.org.
QPID-7958: [Java Broker] [AMQP Management] Dispose converted message on the incoming request path
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bd5b9514
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bd5b9514
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bd5b9514
Branch: refs/heads/master
Commit: bd5b951470d2c78392cdbc8e09811c6922a464ef
Parents: 0a447b6
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 14:26:53 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 14:26:53 2017 +0100
----------------------------------------------------------------------
.../server/management/amqp/ManagementNode.java | 23 +++++++++++---------
1 file changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd5b9514/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 4790b78..8efb2af 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -395,16 +395,8 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
{
}
- private synchronized void enqueue(InternalMessage message,
- Action<? super MessageInstance> postEnqueueAction)
+ private synchronized void processRequest(InternalMessage message)
{
- if(postEnqueueAction != null)
- {
- postEnqueueAction.performAction(new ConsumedMessageInstance(message));
- }
-
-
-
String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE);
String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE);
String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER);
@@ -444,7 +436,18 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
final InternalMessage msg = converter.convert(message, _addressSpace);
- enqueue(msg, action);
+ try
+ {
+ if (action != null)
+ {
+ action.performAction(new ConsumedMessageInstance(msg));
+ }
+ processRequest(msg);
+ }
+ finally
+ {
+ converter.dispose(msg);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org