You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/06/18 09:16:56 UTC
[qpid-broker-j] 03/07: QPID-8316: [Broker-J] Flow to disk messages
with metadata loaded into direct memory
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 712942a7aeec9a46b06417c43e985179756b7183
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Jun 13 22:00:29 2019 +0100
QPID-8316: [Broker-J] Flow to disk messages with metadata loaded into direct memory
---
.../store/berkeleydb/AbstractBDBMessageStore.java | 27 ++++-
.../store/berkeleydb/BDBMessageStoreTest.java | 6 ++
.../server/message/AbstractServerMessageImpl.java | 2 +-
.../server/message/internal/InternalMessage.java | 8 +-
.../apache/qpid/server/queue/AbstractQueue.java | 24 ++---
.../qpid/server/store/StoredMemoryMessage.java | 8 +-
.../apache/qpid/server/store/StoredMessage.java | 4 +-
.../server/virtualhost/AbstractVirtualHost.java | 16 ++-
.../qpid/server/queue/AbstractQueueTestBase.java | 58 ++++++++++
.../queue/FlowToDiskOverflowPolicyHandlerTest.java | 3 +-
.../qpid/server/store/MemoryMessageStoreTest.java | 6 ++
.../qpid/server/store/MessageStoreTestCase.java | 112 +++++++++++++++++++-
.../virtualhost/FlowToDiskCheckingTaskTest.java | 117 +++++++++++++++++++++
.../v0_10/MessageConverter_Internal_to_v0_10.java | 8 +-
.../protocol/v0_10/MessageConverter_v0_10.java | 8 +-
.../v0_8/MessageConverter_Internal_to_v0_8.java | 8 +-
.../protocol/v1_0/MessageConverter_to_1_0.java | 8 +-
.../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java | 8 +-
.../v0_8_v0_10/MessageConverter_0_10_to_0_8.java | 8 +-
.../v0_8_v0_10/MessageConverter_0_8_to_0_10.java | 8 +-
.../v0_8_v1_0/MessageConverter_1_0_to_v0_8.java | 8 +-
.../server/store/derby/DerbyMessageStoreTest.java | 6 ++
.../store/jdbc/AbstractJDBCMessageStore.java | 27 ++++-
.../server/store/jdbc/JDBCMessageStoreTest.java | 6 ++
24 files changed, 451 insertions(+), 43 deletions(-)
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 6a57739..b48cd54 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1221,11 +1221,36 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
- public synchronized boolean isInMemory()
+ public synchronized boolean isInContentInMemory()
{
return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
}
+ @Override
+ public synchronized long getInMemorySize()
+ {
+ long size = 0;
+ if (_messageDataRef != null)
+ {
+ if (_messageDataRef.isHardRef())
+ {
+ size += getMetadataSize() + getContentSize();
+ }
+ else
+ {
+ if (_messageDataRef.getMetaData() != null)
+ {
+ size += getMetadataSize();
+ }
+ if (_messageDataRef.getData() != null)
+ {
+ size += getContentSize();
+ }
+ }
+ }
+ return size;
+ }
+
private boolean stored()
{
return _messageDataRef != null && !_messageDataRef.isHardRef();
diff --git a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
index b45f6c2..d3fea6e 100644
--- a/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
+++ b/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
@@ -212,4 +212,10 @@ public class BDBMessageStoreTest extends MessageStoreTestCase
return new BDBMessageStore();
}
+ @Override
+ protected boolean flowToDiskSupported()
+ {
+ return true;
+ }
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 2f72098..ea1e8d7 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -194,7 +194,7 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
public QpidByteBuffer getContent(int offset, int length)
{
StoredMessage<T> storedMessage = getStoredMessage();
- boolean wasInMemory = storedMessage.isInMemory();
+ boolean wasInMemory = storedMessage.isInContentInMemory();
try
{
return storedMessage.getContent(offset, length);
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
index 75f56de..f1decc8 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java
@@ -281,12 +281,18 @@ public class InternalMessage extends AbstractServerMessageImpl<InternalMessage,
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 375732b..22e42d4 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1226,7 +1226,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
final StoredMessage storedMessage = message.getStoredMessage();
if ((_virtualHost.isOverTargetSize()
|| QpidByteBuffer.getAllocatedDirectMemorySize() > _flowToDiskThreshold)
- && storedMessage.isInMemory())
+ && storedMessage.getInMemorySize() > 0)
{
if (message.checkValid())
{
@@ -2383,26 +2383,14 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
public boolean checkValid(final QueueEntry queueEntry)
{
final ServerMessage message = queueEntry.getMessage();
- final ServerMessage.ValidationStatus validationStatus = message.getValidationStatus();
- boolean isValid = false;
- if (validationStatus == ServerMessage.ValidationStatus.UNKNOWN)
+ boolean isValid = true;
+ try (MessageReference ref = message.newReference())
{
- try (MessageReference ref = message.newReference())
- {
- isValid = message.checkValid();
- }
- catch (MessageDeletedException e)
- {
- // noop
- }
- }
- else
- {
- isValid = validationStatus == ServerMessage.ValidationStatus.VALID;
+ isValid = message.checkValid();
}
- if (!isValid)
+ catch (MessageDeletedException e)
{
- malformedEntry(queueEntry);
+ // noop
}
return isValid;
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index 1132e82..118ec44 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -117,12 +117,18 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index a1be172..c276e7b 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -41,7 +41,9 @@ public interface StoredMessage<M extends StorableMessageMetaData>
void remove();
- boolean isInMemory();
+ boolean isInContentInMemory();
+
+ long getInMemorySize();
boolean flowToDisk();
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c3b9ab3..7642ad0 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2100,7 +2100,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
- private class FlowToDiskCheckingTask extends HouseKeepingTask
+ class FlowToDiskCheckingTask extends HouseKeepingTask
{
public FlowToDiskCheckingTask()
{
@@ -2133,19 +2133,17 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
try (MessageReference messageReference = node.getMessage().newReference())
{
final StoredMessage storedMessage = messageReference.getMessage().getStoredMessage();
- if (storedMessage.isInMemory())
+ final long inMemorySize = storedMessage.getInMemorySize();
+ if (inMemorySize > 0)
{
if (cumulativeSize <= currentTargetSize)
{
- cumulativeSize += storedMessage.getContentSize();
- cumulativeSize += storedMessage.getMetadataSize();
+ cumulativeSize += inMemorySize;
}
- else
+
+ if (cumulativeSize > currentTargetSize && node.getQueue().checkValid(node))
{
- if (node.getQueue().checkValid(node))
- {
- storedMessage.flowToDisk();
- }
+ storedMessage.flowToDisk();
}
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 82b8dc8..4fcea19 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -32,6 +32,7 @@ import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
@@ -69,6 +70,7 @@ import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.RoutingResult;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
@@ -1214,6 +1216,62 @@ abstract class AbstractQueueTestBase extends UnitTestBase
(long) target.getQueueDepthMessages());
}
+ @Test
+ public void testEnqueuedMessageFlowedToDisk() throws Exception
+ {
+ makeVirtualHostTargetSizeExceeded();
+
+ final ServerMessage message2 = createMessage(1L, 2, 3);
+ final long sizeIncludingHeader = message2.getSizeIncludingHeader();
+ final StoredMessage storedMessage = message2.getStoredMessage();
+ when(storedMessage.getInMemorySize()).thenReturn(sizeIncludingHeader);
+
+ _queue.enqueue(message2, null, null);
+
+ verify(storedMessage).getInMemorySize();
+ verify(storedMessage).flowToDisk();
+
+ assertEquals("Unexpected number of messages on the queue",
+ 2,
+ _queue.getQueueDepthMessages());
+ }
+
+ @Test
+ public void testEnqueuedMalformedMessageDeleted() throws Exception
+ {
+ makeVirtualHostTargetSizeExceeded();
+
+ final ServerMessage message2 = createMessage(1L, 2, 3);
+ final long sizeIncludingHeader = message2.getSizeIncludingHeader();
+ final StoredMessage storedMessage = message2.getStoredMessage();
+ when(storedMessage.getInMemorySize()).thenReturn(sizeIncludingHeader);
+ when(message2.checkValid()).thenReturn(false);
+
+ _queue.enqueue(message2, null, null);
+
+ verify(storedMessage).getInMemorySize();
+ verify(storedMessage, never()).flowToDisk();
+
+ assertEquals("Unexpected number of messages on the queue",
+ 1,
+ _queue.getQueueDepthMessages());
+ }
+
+ private void makeVirtualHostTargetSizeExceeded()
+ {
+ final InternalMessage message = InternalMessage.createMessage(_virtualHost.getMessageStore(),
+ mock(AMQMessageHeader.class),
+ "test",
+ true,
+ _qname);
+ _queue.enqueue(message, null, null);
+ assertEquals("Unexpected number of messages on the queue",
+ 1,
+ _queue.getQueueDepthMessages());
+ _virtualHost.setTargetSize(1L);
+ assertTrue(_virtualHost.isOverTargetSize());
+ }
+
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
index 8351e53..9d8f997 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandlerTest.java
@@ -134,7 +134,8 @@ public class FlowToDiskOverflowPolicyHandlerTest extends UnitTestBase
StoredMessage storedMessage = mock(StoredMessage.class);
when(message.getStoredMessage()).thenReturn(storedMessage);
- when(storedMessage.isInMemory()).thenReturn(true);
+ when(storedMessage.isInContentInMemory()).thenReturn(true);
+ when(storedMessage.getInMemorySize()).thenReturn(size);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
index f05da24..39eb426 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -42,6 +42,12 @@ public class MemoryMessageStoreTest extends MessageStoreTestCase
}
@Override
+ protected boolean flowToDiskSupported()
+ {
+ return false;
+ }
+
+ @Override
protected void reopenStore() throws Exception
{
// cannot re-open memory message store as it is not persistent
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
index 0c8a4ca..f74f60e 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreTestCase.java
@@ -21,10 +21,14 @@
package org.apache.qpid.server.store;
import static junit.framework.TestCase.assertNull;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
@@ -34,19 +38,22 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
-import org.hamcrest.Description;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.Transaction.EnqueueRecord;
@@ -94,6 +101,8 @@ public abstract class MessageStoreTestCase extends UnitTestBase
protected abstract MessageStore createMessageStore();
+ protected abstract boolean flowToDiskSupported();
+
protected MessageStore getStore()
{
return _store;
@@ -439,6 +448,107 @@ public abstract class MessageStoreTestCase extends UnitTestBase
verify(listener, times(1)).messageDeleted(message);
}
+ @Test
+ public void testFlowToDisk() throws Exception
+ {
+ assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+ final StoredMessage<?> storedMessage = createStoredMessage();
+
+ assertEquals(storedMessage.getContentSize() + storedMessage.getMetadataSize(), storedMessage.getInMemorySize());
+ assertTrue(storedMessage.flowToDisk());
+ assertEquals(0, storedMessage.getInMemorySize());
+ }
+
+ @Test
+ public void testFlowToDiskAfterMetadataReload()
+ {
+ assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+ final StoredMessage<?> storedMessage = createStoredMessage();
+
+ assertTrue(storedMessage.flowToDisk());
+ assertNotNull(storedMessage.getMetaData());
+ assertEquals(storedMessage.getMetadataSize(), storedMessage.getInMemorySize());
+
+ assertTrue(storedMessage.flowToDisk());
+ assertEquals(0, storedMessage.getInMemorySize());
+ }
+
+ @Test
+ public void testFlowToDiskAfterContentReload()
+ {
+ assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+ final StoredMessage<?> storedMessage = createStoredMessage();
+
+ assertTrue(storedMessage.flowToDisk());
+ assertNotNull(storedMessage.getContent(0, storedMessage.getContentSize()));
+ assertEquals(storedMessage.getContentSize(), storedMessage.getInMemorySize());
+
+ assertTrue(storedMessage.flowToDisk());
+ assertEquals(0, storedMessage.getInMemorySize());
+ }
+
+
+ @Test
+ public void testIsInContentInMemoryBeforeFlowControl()
+ {
+ assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+ final StoredMessage<?> storedMessage = createStoredMessage();
+
+ assertTrue(storedMessage.isInContentInMemory());
+ }
+
+ @Test
+ public void testIsInContentInMemoryAfterFlowControl()
+ {
+ assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+ final StoredMessage<?> storedMessage = createStoredMessage();
+ assertTrue(storedMessage.flowToDisk());
+ assertFalse(storedMessage.isInContentInMemory());
+ }
+
+ @Test
+ public void testIsInContentInMemoryAfterReload()
+ {
+ assumeThat(flowToDiskSupported(), is(equalTo(true)));
+
+ final StoredMessage<?> storedMessage = createStoredMessage();
+ assertTrue(storedMessage.flowToDisk());
+ assertFalse(storedMessage.isInContentInMemory());
+ assertNotNull(storedMessage.getContent(0, storedMessage.getContentSize()));
+ assertTrue(storedMessage.isInContentInMemory());
+ }
+
+ private StoredMessage<?> createStoredMessage()
+ {
+ return createStoredMessage(Collections.singletonMap("test", "testValue"), "testContent", "testQueue");
+ }
+
+ private StoredMessage<?> createStoredMessage(final Map<String, String> headers,
+ final String content,
+ final String queueName)
+ {
+ return createInternalTestMessage(headers, content, queueName).getStoredMessage();
+ }
+
+ private InternalMessage createInternalTestMessage(final Map<String, String> headers,
+ final String content,
+ final String queueName)
+ {
+ final AMQMessageHeader messageHeader = mock(AMQMessageHeader.class);
+ if (headers != null)
+ {
+ headers.forEach((k,v) -> when(messageHeader.getHeader(k)).thenReturn(v));
+ when(messageHeader.getHeaderNames()).thenReturn(headers.keySet());
+ }
+
+ return InternalMessage.createMessage(_store, messageHeader, content, true, queueName);
+ }
+
private TransactionLogResource createTransactionLogResource(UUID queueId)
{
TransactionLogResource queue = mock(TransactionLogResource.class);
diff --git a/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java
new file mode 100644
index 0000000..9e68f62
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/virtualhost/FlowToDiskCheckingTaskTest.java
@@ -0,0 +1,117 @@
+package org.apache.qpid.server.virtualhost;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class FlowToDiskCheckingTaskTest extends UnitTestBase
+{
+ private static final int FLOW_TO_DISK_CHECK_PERIOD = 0;
+ private AbstractVirtualHost<?> _virtualHost;
+ private Queue _queue;
+ private AbstractVirtualHost.FlowToDiskCheckingTask _task;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(VirtualHost.NAME, getTestName());
+ attributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+ attributes.put(VirtualHost.CONTEXT, Collections.singletonMap(QueueManagingVirtualHost.FLOW_TO_DISK_CHECK_PERIOD,
+ FLOW_TO_DISK_CHECK_PERIOD));
+ _virtualHost = (AbstractVirtualHost)BrokerTestHelper.createVirtualHost(attributes, this);
+ _task = _virtualHost. new FlowToDiskCheckingTask();
+ _queue = _virtualHost.createChild(Queue.class, Collections.singletonMap(Queue.NAME, getTestName()));
+ _queue.enqueue(InternalMessage.createMessage(_virtualHost.getMessageStore(),
+ mock(AMQMessageHeader.class),
+ "test",
+ true,
+ _queue.getName()), null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ if (_queue != null)
+ {
+ _queue.close();
+ }
+ if (_virtualHost != null)
+ {
+ _virtualHost.close();
+ }
+ }
+
+ @Test
+ public void testFlowToDiskInMemoryMessage()
+ {
+ final ServerMessage message = createMessage(10, 20);
+ _queue.enqueue(message, null, null);
+
+ makeVirtualHostTargetSizeExceeded();
+
+ _task.execute();
+ verify(message.getStoredMessage()).flowToDisk();
+ }
+
+ private void makeVirtualHostTargetSizeExceeded()
+ {
+ if (_virtualHost.getInMemoryMessageSize() == 0)
+ {
+ _queue.enqueue(InternalMessage.createMessage(_virtualHost.getMessageStore(),
+ mock(AMQMessageHeader.class),
+ "test",
+ true,
+ _queue.getName()), null, null);
+ }
+ _virtualHost.setTargetSize(1L);
+ assertTrue(_virtualHost.isOverTargetSize());
+ }
+
+ private ServerMessage createMessage(final int headerSize, final int payloadSize)
+ {
+ long totalSize = headerSize + payloadSize;
+ final long id = System.currentTimeMillis();
+ final ServerMessage message = mock(ServerMessage.class);
+ when(message.getMessageNumber()).thenReturn(id);
+ when(message.getMessageHeader()).thenReturn(mock(AMQMessageHeader.class));
+ when(message.checkValid()).thenReturn(true);
+ when(message.getSizeIncludingHeader()).thenReturn(totalSize);
+ when(message.getValidationStatus()).thenReturn(ServerMessage.ValidationStatus.UNKNOWN);
+
+ final StoredMessage storedMessage = mock(StoredMessage.class);
+ when(storedMessage.getContentSize()).thenReturn(payloadSize);
+ when(storedMessage.getMetadataSize()).thenReturn(headerSize);
+ when(storedMessage.getInMemorySize()).thenReturn(totalSize);
+ when(message.getStoredMessage()).thenReturn(storedMessage);
+
+ final MessageReference ref = mock(MessageReference.class);
+ when(ref.getMessage()).thenReturn(message);
+
+ when(message.newReference()).thenReturn(ref);
+ when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
+ return message;
+ }
+}
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
index 114b0f2..5a27e89 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_Internal_to_v0_10.java
@@ -129,12 +129,18 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter<Inte
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 8c9db87..10e76bd 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -105,12 +105,18 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index b362b2a..1196d38 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -132,12 +132,18 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index 71c3b06..a9914c4 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -443,12 +443,18 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
index 07aeb95..83bc923 100644
--- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -149,12 +149,18 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
index 59ab6b0..ab9afa6 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -274,12 +274,18 @@ public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTra
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getMetadataSize() + getContentSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
index 6d9c190..68c96c9 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -115,12 +115,18 @@ public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessag
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
index 3c6058f..738bd9a 100644
--- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
+++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -140,12 +140,18 @@ public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_
}
@Override
- public boolean isInMemory()
+ public boolean isInContentInMemory()
{
return true;
}
@Override
+ public long getInMemorySize()
+ {
+ return getContentSize() + getMetadataSize();
+ }
+
+ @Override
public boolean flowToDisk()
{
return false;
diff --git a/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java b/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
index 336da71..da203bb 100644
--- a/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
+++ b/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreTest.java
@@ -109,4 +109,10 @@ public class DerbyMessageStoreTest extends MessageStoreTestCase
return new DerbyMessageStore();
}
+ @Override
+ protected boolean flowToDiskSupported()
+ {
+ return true;
+ }
+
}
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index b2359b7..df02e55 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1637,11 +1637,36 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
- public synchronized boolean isInMemory()
+ public synchronized boolean isInContentInMemory()
{
return _messageDataRef != null && (_messageDataRef.isHardRef() || _messageDataRef.getData() != null);
}
+ @Override
+ public synchronized long getInMemorySize()
+ {
+ long size = 0;
+ if (_messageDataRef != null)
+ {
+ if (_messageDataRef.isHardRef())
+ {
+ size += getMetadataSize() + getContentSize();
+ }
+ else
+ {
+ if (_messageDataRef.getMetaData() != null)
+ {
+ size += getMetadataSize();
+ }
+ if (_messageDataRef.getData() != null)
+ {
+ size += getContentSize();
+ }
+ }
+ }
+ return size;
+ }
+
private boolean stored()
{
return _messageDataRef != null && !_messageDataRef.isHardRef();
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
index a152337..9f12062 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/JDBCMessageStoreTest.java
@@ -323,6 +323,12 @@ public class JDBCMessageStoreTest extends MessageStoreTestCase
return new GenericJDBCMessageStore();
}
+ @Override
+ protected boolean flowToDiskSupported()
+ {
+ return true;
+ }
+
private Connection openConnection() throws SQLException
{
return TestJdbcUtils.openConnection(_connectionURL);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org