You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/09 13:27:16 UTC

[2/3] qpid-broker-j git commit: QPID-7958: [Java Broker] Ensure a message reference is always taken for SystemMessageSource messages

QPID-7958: [Java Broker] Ensure a message reference is always taken for SystemMessageSource messages


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2967b682
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2967b682
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2967b682

Branch: refs/heads/master
Commit: 2967b682326ae7f0f862a0086ae54ea733c5cc23
Parents: 3eb2426
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Oct 9 12:08:21 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Oct 9 13:32:31 2017 +0100

----------------------------------------------------------------------
 .../AbstractSystemMessageSource.java            | 30 ++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2967b682/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 24e19f5..3d6d009 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -34,13 +34,14 @@ import org.apache.qpid.server.consumer.ConsumerTarget;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.message.MessageContainer;
 import org.apache.qpid.server.session.AMQPSession;
 import org.apache.qpid.server.store.MessageDurability;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
@@ -115,15 +116,14 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         return MessageConversionExceptionHandlingPolicy.CLOSE;
     }
 
-    protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>
+    protected class Consumer<T extends ConsumerTarget> implements MessageInstanceConsumer<T>, TransactionLogResource
     {
 
         private final List<PropertiesMessageInstance> _queue =
                 Collections.synchronizedList(new ArrayList<PropertiesMessageInstance>());
         private final T _target;
         private final String _name;
-        private final Object _identifier = new Object();
-
+        private final UUID _identifier = UUID.randomUUID();
 
         public Consumer(final String consumerName, T target)
         {
@@ -161,7 +161,9 @@ public abstract class AbstractSystemMessageSource implements MessageSource
                 if (!_target.isSuspended() && _target.allocateCredit(propertiesMessageInstance.getMessage()))
                 {
                     _queue.remove(0);
-                    return new MessageContainer(propertiesMessageInstance, null, false);
+                    return new MessageContainer(propertiesMessageInstance,
+                                                propertiesMessageInstance.getMessageReference(),
+                                                false);
                 }
             }
             return null;
@@ -200,6 +202,18 @@ public abstract class AbstractSystemMessageSource implements MessageSource
             return _name;
         }
 
+        @Override
+        public UUID getId()
+        {
+            return _identifier;
+        }
+
+        @Override
+        public MessageDurability getMessageDurability()
+        {
+            return MessageDurability.NEVER;
+        }
+
         public void send(final InternalMessage response)
         {
             _queue.add(new PropertiesMessageInstance(this, response));
@@ -210,6 +224,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
     class PropertiesMessageInstance implements MessageInstance
     {
         private final Consumer _consumer;
+        private final MessageReference _messageReference;
         private int _deliveryCount;
         private boolean _isRedelivered;
         private boolean _isDelivered;
@@ -220,6 +235,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
         {
             _consumer = consumer;
             _message = message;
+            _messageReference = message.newReference(consumer);
         }
 
         @Override
@@ -429,5 +445,9 @@ public abstract class AbstractSystemMessageSource implements MessageSource
             return AbstractSystemMessageSource.this;
         }
 
+        public MessageReference getMessageReference()
+        {
+            return _messageReference;
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org