You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/09 13:27:15 UTC

[1/3] qpid-broker-j git commit: QPID-7958: [Java Broker] Ensure SystemMessageSource messages are deleted even when they are not consumed

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 3eb2426b0 -> bd5b95147


QPID-7958: [Java Broker] Ensure SystemMessageSource messages are deleted even when they are not consumed


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0a447b66
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0a447b66
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0a447b66

Branch: refs/heads/master
Commit: 0a447b66e58a7d63796f88348d62c6131024170e
Parents: 2967b68
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 12:24:06 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 13:32:31 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/virtualhost/AbstractSystemMessageSource.java  | 7 ++++++-
 .../qpid/server/management/amqp/ManagementNodeConsumer.java   | 1 +
 .../qpid/server/management/amqp/ManagementResponse.java       | 1 +
 3 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a447b66/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 3d6d009..3693c70 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -193,6 +193,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         @Override
         public void close()
         {
+            _queue.forEach(PropertiesMessageInstance::delete);
             _consumers.remove(this);
         }
 
@@ -400,12 +401,16 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         @Override
         public void release(MessageInstanceConsumer<?> consumer)
         {
-            release();
+            if (isAcquiredBy(consumer))
+            {
+                release();
+            }
         }
 
         @Override
         public void delete()
         {
+            _messageReference.release();
             _isDeleted = true;
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a447b66/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 10e9491..7035210 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -127,6 +127,7 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
     @Override
     public void close()
     {
+        _queue.forEach(ManagementResponse::delete);
         _managementNode.unregisterConsumer(this);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a447b66/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index ad830ba..8e9e5c8 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -219,6 +219,7 @@ class ManagementResponse implements MessageInstance
     @Override
     public void delete()
     {
+        _messageReference.release();
         _isDeleted = true;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-7958: [Java Broker] Ensure a message reference is always taken for SystemMessageSource messages

Posted by lq...@apache.org.
QPID-7958: [Java Broker] Ensure a message reference is always taken for SystemMessageSource messages


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2967b682
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2967b682
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2967b682

Branch: refs/heads/master
Commit: 2967b682326ae7f0f862a0086ae54ea733c5cc23
Parents: 3eb2426
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 12:08:21 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 13:32:31 2017 +0100

----------------------------------------------------------------------
 .../AbstractSystemMessageSource.java            | 30 ++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2967b682/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 24e19f5..3d6d009 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -34,13 +34,14 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -115,15 +116,14 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         return MessageConversionExceptionHandlingPolicy.CLOSE;
     }
 
-    protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>
+    protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, TransactionLogResource
     {
 
         private final List<PropertiesMessageInstance> _queue =
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
         private final T _target;
         private final String _name;
-        private final Object _identifier = new Object();
-
+        private final UUID _identifier = UUID.randomUUID();
 
         public Consumer(final String consumerName, T target)
         {
@@ -161,7 +161,9 @@ public abstract class AbstractSystemMessageSource implements MessageSource
                 if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                 {
                     _queue.remove(0);
-                    return new MessageContainer(propertiesMessageInstance, null, false);
+                    return new MessageContainer(propertiesMessageInstance,
+                                                propertiesMessageInstance.getMessageReference(),
+                                                false);
                 }
             }
             return null;
@@ -200,6 +202,18 @@ public abstract class AbstractSystemMessageSource implements MessageSource
             return _name;
         }
 
+        @Override
+        public UUID getId()
+        {
+            return _identifier;
+        }
+
+        @Override
+        public MessageDurability getMessageDurability()
+        {
+            return MessageDurability.NEVER;
+        }
+
         public void send(final InternalMessage response)
         {
             _queue.add(new PropertiesMessageInstance(this, response));
@@ -210,6 +224,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
     class PropertiesMessageInstance implements MessageInstance
     {
         private final Consumer _consumer;
+        private final MessageReference _messageReference;
         private int _deliveryCount;
         private boolean _isRedelivered;
         private boolean _isDelivered;
@@ -220,6 +235,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         {
             _consumer = consumer;
             _message = message;
+            _messageReference = message.newReference(consumer);
         }
 
         @Override
@@ -429,5 +445,9 @@ public abstract class AbstractSystemMessageSource implements MessageSource
             return AbstractSystemMessageSource.this;
         }
 
+        public MessageReference getMessageReference()
+        {
+            return _messageReference;
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-7958: [Java Broker] [AMQP Management] Dispose converted message on the incoming request path

Posted by lq...@apache.org.
QPID-7958: [Java Broker] [AMQP Management] Dispose converted message on the incoming request path


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bd5b9514
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bd5b9514
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bd5b9514

Branch: refs/heads/master
Commit: bd5b951470d2c78392cdbc8e09811c6922a464ef
Parents: 0a447b6
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 14:26:53 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 14:26:53 2017 +0100

----------------------------------------------------------------------
 .../server/management/amqp/ManagementNode.java  | 23 +++++++++++---------
 1 file changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bd5b9514/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 4790b78..8efb2af 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -395,16 +395,8 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
     {
     }
 
-    private synchronized void enqueue(InternalMessage message,
-                                      Action<? super MessageInstance> postEnqueueAction)
+    private synchronized void processRequest(InternalMessage message)
     {
-        if(postEnqueueAction != null)
-        {
-            postEnqueueAction.performAction(new ConsumedMessageInstance(message));
-        }
-
-
-
         String id = (String) message.getMessageHeader().getHeader(IDENTITY_ATTRIBUTE);
         String type = (String) message.getMessageHeader().getHeader(TYPE_ATTRIBUTE);
         String operation = (String) message.getMessageHeader().getHeader(OPERATION_HEADER);
@@ -444,7 +436,18 @@ class ManagementNode implements MessageSource, MessageDestination, BaseQueue
 
         final InternalMessage msg = converter.convert(message, _addressSpace);
 
-        enqueue(msg, action);
+        try
+        {
+            if (action != null)
+            {
+                action.performAction(new ConsumedMessageInstance(msg));
+            }
+            processRequest(msg);
+        }
+        finally
+        {
+            converter.dispose(msg);
+        }
 
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org