You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/01/05 11:24:42 UTC

svn commit: r1777449 - in /qpid/java/trunk: bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/o...

Author: lquack
Date: Thu Jan  5 11:24:41 2017
New Revision: 1777449

URL: http://svn.apache.org/viewvc?rev=1777449&view=rev
Log:
QPID-7576: [Java Broker] refactoring related to message metadata

This addresses review comments from alex:
 * remove return value from ConsumerTarget#send
 * call StoredMessage.getContentSize() instead of StoredMessage.getMetaData().getContentSize()
 * assume _handle in AbstractServerMessageImpl is not null
note that the last point differs from review suggestion in that _handle is not explicitly guarded against null.

Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Thu Jan  5 11:24:41 2017
@@ -1311,7 +1311,7 @@ public abstract class AbstractBDBMessage
             if(message.getStoredMessage() instanceof StoredBDBMessage)
             {
                 final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
-                final long contentSize = storedMessage.getMetaData().getContentSize();
+                final long contentSize = storedMessage.getContentSize();
                 _preCommitActions.add(new Runnable()
                 {
                     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Jan  5 11:24:41 2017
@@ -203,17 +203,14 @@ public abstract class AbstractConsumerTa
     }
 
     @Override
-    public final long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+    public final void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
     {
-        // NoAck consumer might delete the message during doSend so we must locally cache the size
-        final long messageSize = entry.getMessage().getSize();
         doSend(consumer, entry, batch);
 
         if (consumer.acquires())
         {
             entry.makeAcquisitionStealable();
         }
-        return messageSize;
     }
 
     protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Jan  5 11:24:41 2017
@@ -60,7 +60,7 @@ public interface ConsumerTarget<T extend
 
     AMQSessionModel<?,T> getSessionModel();
 
-    long send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
+    void send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
 
     boolean sendNextMessage();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Jan  5 11:24:41 2017
@@ -108,12 +108,7 @@ public abstract class AbstractServerMess
                 updated = _refCountUpdater.compareAndSet(this, count, -1);
                 if (updated)
                 {
-                    // must check if the handle is null since there may be cases where we decide to throw away a message
-                    // and the handle has not yet been constructed
-                    if (_handle != null)
-                    {
-                        _handle.remove();
-                    }
+                    _handle.remove();
                 }
             }
             else

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Thu Jan  5 11:24:41 2017
@@ -1145,7 +1145,7 @@ public abstract class AbstractJDBCMessag
                         try
                         {
                             ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
-                            _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+                            _storeSizeIncrease += storedMessage.getContentSize();
                         }
                         catch (SQLException e)
                         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Thu Jan  5 11:24:41 2017
@@ -60,7 +60,7 @@ public class StoredMemoryMessage<T exten
             }
             else
             {
-                final int contentSize = _metaData.getContentSize();
+                final int contentSize = getContentSize();
                 int size = (contentSize < _content.position() + src.remaining())
                         ? _content.position() + src.remaining()
                         : contentSize;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java Thu Jan  5 11:24:41 2017
@@ -42,9 +42,9 @@ class MessageRecord implements Record
         buf.dispose();
 
 
-        _content = new byte[storedMessage.getMetaData().getContentSize()];
+        _content = new byte[storedMessage.getContentSize()];
         buf = QpidByteBuffer.wrap(_content);
-        for(QpidByteBuffer content : storedMessage.getContent(0, storedMessage.getMetaData().getContentSize()))
+        for(QpidByteBuffer content : storedMessage.getContent(0, storedMessage.getContentSize()))
         {
             buf.put(content);
             content.dispose();

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Thu Jan  5 11:24:41 2017
@@ -119,15 +119,13 @@ public class TestConsumerTarget implemen
     {
     }
 
-    public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+    public void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
     {
-        long size = entry.getMessage().getSize();
         if (_messages.contains(entry))
         {
             entry.setRedelivered();
         }
         _messages.add(entry);
-        return size;
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Jan  5 11:24:41 2017
@@ -379,11 +379,11 @@ abstract class AbstractQueueTestBase ext
             }
 
             @Override
-            public long send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+            public void send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
             {
                 try
                 {
-                    return super.send(consumer, entry, batch);
+                    super.send(consumer, entry, batch);
                 }
                 finally
                 {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Jan  5 11:24:41 2017
@@ -139,11 +139,10 @@ public class StandardQueueTest extends A
              * @param entry
              * @param batch
              */
-            public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+            public void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
             {
-                long size = super.send(consumer, entry, batch);
+                super.send(consumer, entry, batch);
                 latch.countDown();
-                return size;
             }
         };
 

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Jan  5 11:24:41 2017
@@ -302,11 +302,11 @@ public class ServerSession extends Sessi
     {
         if (isTransactional() && !(_transaction instanceof DistributedTransaction))
         {
-            _uncommittedMessageSize += handle.getMetaData().getContentSize();
+            _uncommittedMessageSize += handle.getContentSize();
             if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
             {
                 handle.flowToDisk();
-                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
                 {
                     getAMQPConnection().getEventLogger()
                                        .message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Jan  5 11:24:41 2017
@@ -560,11 +560,11 @@ public class AMQChannel
     {
         if (isTransactional())
         {
-            _uncommittedMessageSize += handle.getMetaData().getContentSize();
+            _uncommittedMessageSize += handle.getContentSize();
             if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
             {
                 handle.flowToDisk();
-                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
                 {
                     messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
                 }

Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java Thu Jan  5 11:24:41 2017
@@ -279,11 +279,11 @@ public class ProxyMessageSource implemen
         }
 
         @Override
-        public long send(final MessageInstanceConsumer consumer,
+        public void send(final MessageInstanceConsumer consumer,
                          final MessageInstance entry,
                          final boolean batch)
         {
-            return _underlying.send(_consumer, entry, batch);
+            _underlying.send(_consumer, entry, batch);
         }
 
         @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org