You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/01/05 11:24:42 UTC
svn commit: r1777449 - in /qpid/java/trunk:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/o...
Author: lquack
Date: Thu Jan 5 11:24:41 2017
New Revision: 1777449
URL: http://svn.apache.org/viewvc?rev=1777449&view=rev
Log:
QPID-7576: [Java Broker] refactoring related to message metadata
This addresses review comments from alex:
* remove return value from ConsumerTarget#send
* call StoredMessage.getContentSize() instead of StoredMessage.getMetaData().getContentSize()
* assume _handle in AbstractServerMessageImpl is not null
note that the last point differs from review suggestion in that _handle is not explicitly guarded against null.
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Thu Jan 5 11:24:41 2017
@@ -1311,7 +1311,7 @@ public abstract class AbstractBDBMessage
if(message.getStoredMessage() instanceof StoredBDBMessage)
{
final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage();
- final long contentSize = storedMessage.getMetaData().getContentSize();
+ final long contentSize = storedMessage.getContentSize();
_preCommitActions.add(new Runnable()
{
@Override
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Jan 5 11:24:41 2017
@@ -203,17 +203,14 @@ public abstract class AbstractConsumerTa
}
@Override
- public final long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ public final void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
- // NoAck consumer might delete the message during doSend so we must locally cache the size
- final long messageSize = entry.getMessage().getSize();
doSend(consumer, entry, batch);
if (consumer.acquires())
{
entry.makeAcquisitionStealable();
}
- return messageSize;
}
protected abstract void doSend(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Jan 5 11:24:41 2017
@@ -60,7 +60,7 @@ public interface ConsumerTarget<T extend
AMQSessionModel<?,T> getSessionModel();
- long send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
+ void send(final MessageInstanceConsumer<T> consumer, MessageInstance entry, boolean batch);
boolean sendNextMessage();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Thu Jan 5 11:24:41 2017
@@ -108,12 +108,7 @@ public abstract class AbstractServerMess
updated = _refCountUpdater.compareAndSet(this, count, -1);
if (updated)
{
- // must check if the handle is null since there may be cases where we decide to throw away a message
- // and the handle has not yet been constructed
- if (_handle != null)
- {
- _handle.remove();
- }
+ _handle.remove();
}
}
else
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Thu Jan 5 11:24:41 2017
@@ -1145,7 +1145,7 @@ public abstract class AbstractJDBCMessag
try
{
((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
- _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
+ _storeSizeIncrease += storedMessage.getContentSize();
}
catch (SQLException e)
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Thu Jan 5 11:24:41 2017
@@ -60,7 +60,7 @@ public class StoredMemoryMessage<T exten
}
else
{
- final int contentSize = _metaData.getContentSize();
+ final int contentSize = getContentSize();
int size = (contentSize < _content.position() + src.remaining())
? _content.position() + src.remaining()
: contentSize;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/serializer/v1/MessageRecord.java Thu Jan 5 11:24:41 2017
@@ -42,9 +42,9 @@ class MessageRecord implements Record
buf.dispose();
- _content = new byte[storedMessage.getMetaData().getContentSize()];
+ _content = new byte[storedMessage.getContentSize()];
buf = QpidByteBuffer.wrap(_content);
- for(QpidByteBuffer content : storedMessage.getContent(0, storedMessage.getMetaData().getContentSize()))
+ for(QpidByteBuffer content : storedMessage.getContent(0, storedMessage.getContentSize()))
{
buf.put(content);
content.dispose();
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java Thu Jan 5 11:24:41 2017
@@ -119,15 +119,13 @@ public class TestConsumerTarget implemen
{
}
- public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ public void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
- long size = entry.getMessage().getSize();
if (_messages.contains(entry))
{
entry.setRedelivered();
}
_messages.add(entry);
- return size;
}
@Override
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Jan 5 11:24:41 2017
@@ -379,11 +379,11 @@ abstract class AbstractQueueTestBase ext
}
@Override
- public long send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ public void send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
try
{
- return super.send(consumer, entry, batch);
+ super.send(consumer, entry, batch);
}
finally
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Thu Jan 5 11:24:41 2017
@@ -139,11 +139,10 @@ public class StandardQueueTest extends A
* @param entry
* @param batch
*/
- public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
+ public void send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
- long size = super.send(consumer, entry, batch);
+ super.send(consumer, entry, batch);
latch.countDown();
- return size;
}
};
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Thu Jan 5 11:24:41 2017
@@ -302,11 +302,11 @@ public class ServerSession extends Sessi
{
if (isTransactional() && !(_transaction instanceof DistributedTransaction))
{
- _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ _uncommittedMessageSize += handle.getContentSize();
if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
{
handle.flowToDisk();
- if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
{
getAMQPConnection().getEventLogger()
.message(_logSubject, ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Thu Jan 5 11:24:41 2017
@@ -560,11 +560,11 @@ public class AMQChannel
{
if (isTransactional())
{
- _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ _uncommittedMessageSize += handle.getContentSize();
if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
{
handle.flowToDisk();
- if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getMetaData().getContentSize())
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize == handle.getContentSize())
{
messageWithSubject(ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
}
Modified: qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java?rev=1777449&r1=1777448&r2=1777449&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java (original)
+++ qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java Thu Jan 5 11:24:41 2017
@@ -279,11 +279,11 @@ public class ProxyMessageSource implemen
}
@Override
- public long send(final MessageInstanceConsumer consumer,
+ public void send(final MessageInstanceConsumer consumer,
final MessageInstance entry,
final boolean batch)
{
- return _underlying.send(_consumer, entry, batch);
+ _underlying.send(_consumer, entry, batch);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org