You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/12 16:21:57 UTC
[qpid-broker-j] branch 7.1.x updated: QPID-8341: [Broker-J] Fix
reject overflow policy
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/7.1.x by this push:
new 4dd26dd QPID-8341: [Broker-J] Fix reject overflow policy
4dd26dd is described below
commit 4dd26ddd0263fac22d91a0a16c48789592a9ccb2
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Jul 12 16:05:28 2019 +0100
QPID-8341: [Broker-J] Fix reject overflow policy
(cherry picked from commit d7b55e8b991900074a6845c325a79bfad0549768)
---
.../qpid/server/queue/RejectPolicyHandler.java | 45 ++++----
.../extensions/queue/OverflowPolicyTestBase.java | 128 +++++++++++++++++++++
.../extensions/queue/ProducerFlowControlTest.java | 76 +-----------
.../extensions/queue/RejectOverflowPolicyTest.java | 111 ++++++++++++++++++
4 files changed, 264 insertions(+), 96 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
index d81e1d2..a3584af 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
@@ -109,30 +109,31 @@ public class RejectPolicyHandler
final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
final int queueDepthMessages = _queue.getQueueDepthMessages();
final long queueDepthBytes = _queue.getQueueDepthBytes();
-
- int pendingMessages = _pendingDepthMessages.addAndGet(1);
- long pendingBytes = _pendingDepthBytes.addAndGet(newMessage.getSizeIncludingHeader());
-
- boolean messagesOverflow = maximumQueueDepthMessages >= 0
- && queueDepthMessages + pendingMessages > maximumQueueDepthMessages;
- boolean bytesOverflow = maximumQueueDepthBytes >= 0
- && queueDepthBytes + pendingBytes > maximumQueueDepthBytes;
- if (bytesOverflow || messagesOverflow)
+ final long size = newMessage.getSizeIncludingHeader();
+ if (_pendingMessages.putIfAbsent(newMessage.getStoredMessage(), size) == null)
{
- final long depthBytesDelta = -newMessage.getSizeIncludingHeader();
- _pendingDepthBytes.addAndGet(-depthBytesDelta);
- _pendingDepthMessages.addAndGet(-1);
- final String message = String.format(
- "Maximum depth exceeded on '%s' : current=[count: %d, size: %d], max=[count: %d, size: %d]",
- _queue.getName(),
- queueDepthMessages + pendingMessages,
- queueDepthBytes + pendingBytes,
- maximumQueueDepthMessages,
- maximumQueueDepthBytes);
- throw new MessageUnacceptableException(message);
+ int pendingMessages = _pendingDepthMessages.addAndGet(1);
+ long pendingBytes = _pendingDepthBytes.addAndGet(size);
+
+ boolean messagesOverflow = maximumQueueDepthMessages >= 0
+ && queueDepthMessages + pendingMessages > maximumQueueDepthMessages;
+ boolean bytesOverflow = maximumQueueDepthBytes >= 0
+ && queueDepthBytes + pendingBytes > maximumQueueDepthBytes;
+ if (bytesOverflow || messagesOverflow)
+ {
+ _pendingDepthBytes.addAndGet(-size);
+ _pendingDepthMessages.addAndGet(-1);
+ _pendingMessages.remove(newMessage.getStoredMessage());
+ final String message = String.format(
+ "Maximum depth exceeded on '%s' : current=[count: %d, size: %d], max=[count: %d, size: %d]",
+ _queue.getName(),
+ queueDepthMessages + pendingMessages,
+ queueDepthBytes + pendingBytes,
+ maximumQueueDepthMessages,
+ maximumQueueDepthBytes);
+ throw new MessageUnacceptableException(message);
+ }
}
-
- _pendingMessages.put(newMessage.getStoredMessage(), newMessage.getSizeIncludingHeader());
}
private void postEnqueue(MessageInstance instance)
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/OverflowPolicyTestBase.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/OverflowPolicyTestBase.java
new file mode 100644
index 0000000..2eea890
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/OverflowPolicyTestBase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.queue;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.systests.JmsTestBase;
+
+public class OverflowPolicyTestBase extends JmsTestBase
+{
+ private final byte[] BYTE_300 = new byte[300];
+
+
+ protected Queue createQueueWithOverflowPolicy(final String queueName,
+ final OverflowPolicy overflowPolicy,
+ final int maxQueueDepthBytes,
+ final int maxQueueDepthMessages,
+ final int resumeCapacity) throws Exception
+ {
+ final Map<String, Object> attributes = new HashMap<>();
+ if (maxQueueDepthBytes > 0)
+ {
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, maxQueueDepthBytes);
+ if (resumeCapacity > 0)
+ {
+ String flowResumeLimit = getFlowResumeLimit(maxQueueDepthBytes, resumeCapacity);
+ attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
+ String.format("{\"%s\": %s}",
+ org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
+ flowResumeLimit));
+ }
+ }
+ if (maxQueueDepthMessages > 0)
+ {
+ attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, maxQueueDepthMessages);
+ }
+ attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, overflowPolicy.name());
+ createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
+ return createQueue(queueName);
+ }
+
+ protected String getFlowResumeLimit(final double maximumCapacity, final double resumeCapacity)
+ {
+ double ratio = resumeCapacity / maximumCapacity;
+ return String.format("%.2f", ratio * 100.0);
+ }
+
+ protected int evaluateMessageSize() throws Exception
+ {
+ String tmpQueueName = getTestName() + "_Tmp";
+ Queue tmpQueue = createQueue(tmpQueueName);
+ final Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer tmpQueueProducer = session.createProducer(tmpQueue);
+ tmpQueueProducer.send(nextMessage(0, session));
+ session.commit();
+ return getQueueDepthBytes(tmpQueueName);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ protected int getQueueDepthBytes(final String queueName) throws Exception
+ {
+ return getStatistics(queueName, "queueDepthBytes").intValue();
+ }
+
+ protected Number getStatistics(final String queueName, final String statisticsName) throws Exception
+ {
+ Map<String, Object> arguments =
+ Collections.singletonMap("statistics", Collections.singletonList(statisticsName));
+ Object statistics = performOperationUsingAmqpManagement(queueName,
+ "getStatistics",
+ "org.apache.qpid.Queue",
+ arguments);
+ assertNotNull("Statistics is null", statistics);
+ assertTrue("Statistics is not map", statistics instanceof Map);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+ assertTrue(String.format("%s is not present", statisticsName),
+ statisticsMap.get(statisticsName) instanceof Number);
+ return ((Number) statisticsMap.get(statisticsName));
+ }
+
+ protected Message nextMessage(int index, Session producerSession) throws JMSException
+ {
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
+ send.setIntProperty(INDEX, index);
+ return send;
+ }
+}
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
index a3fe680..918f833 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -33,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -45,9 +43,8 @@ import javax.jms.Session;
import org.junit.Test;
import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.systests.JmsTestBase;
-public class ProducerFlowControlTest extends JmsTestBase
+public class ProducerFlowControlTest extends OverflowPolicyTestBase
{
@Test
@@ -306,27 +303,6 @@ public class ProducerFlowControlTest extends JmsTestBase
}
}
- private int getQueueDepthBytes(final String queueName) throws Exception
- {
- return getStatistics(queueName, "queueDepthBytes").intValue();
- }
-
- private Number getStatistics(final String queueName, final String statisticsName) throws Exception
- {
- Map<String, Object> arguments =
- Collections.singletonMap("statistics", Collections.singletonList(statisticsName));
- Object statistics = performOperationUsingAmqpManagement(queueName,
- "getStatistics",
- "org.apache.qpid.Queue",
- arguments);
- assertNotNull("Statistics is null", statistics);
- assertTrue("Statistics is not map", statistics instanceof Map);
- @SuppressWarnings("unchecked")
- Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
- assertTrue(String.format("%s is not present", statisticsName), statisticsMap.get(statisticsName) instanceof Number);
- return ((Number) statisticsMap.get(statisticsName));
- }
-
private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception
{
final Map<String, Object> attributes = new HashMap<>();
@@ -340,12 +316,6 @@ public class ProducerFlowControlTest extends JmsTestBase
updateEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
}
- private String getFlowResumeLimit(final double maximumCapacity, final double resumeCapacity)
- {
- double ratio = resumeCapacity / maximumCapacity;
- return String.format("%.2f", ratio * 100.0);
- }
-
private boolean isFlowStopped(final String queueName) throws Exception
{
Map<String, Object> attributes = readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", false);
@@ -358,19 +328,7 @@ public class ProducerFlowControlTest extends JmsTestBase
int resumeCapacity) throws Exception
{
- final Map<String, Object> attributes = new HashMap<>();
- if (capacity != 0)
- {
- String flowResumeLimit = getFlowResumeLimit(capacity, resumeCapacity);
- attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
- String.format("{\"%s\": %s}",
- org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
- flowResumeLimit));
- }
- attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, capacity);
- attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL.name());
- createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
- return createQueue(queueName);
+ return createQueueWithOverflowPolicy(queueName, OverflowPolicy.PRODUCER_FLOW_CONTROL, capacity, -1, resumeCapacity);
}
private MessageSender sendMessagesAsync(final MessageProducer producer,
@@ -390,16 +348,6 @@ public class ProducerFlowControlTest extends JmsTestBase
return sender;
}
- private final byte[] BYTE_300 = new byte[300];
-
- private Message nextMessage(int msg, Session producerSession) throws JMSException
- {
- BytesMessage send = producerSession.createBytesMessage();
- send.writeBytes(BYTE_300);
- send.setIntProperty("msg", msg);
- return send;
- }
-
private boolean awaitStatisticsValue(String queueName, String statisticsName, Number expectedValue, long timeout)
throws Exception
{
@@ -467,26 +415,6 @@ public class ProducerFlowControlTest extends JmsTestBase
return found;
}
- private int evaluateMessageSize() throws Exception
- {
- String tmpQueueName = getTestName() + "_Tmp";
- Queue tmpQueue = createQueue(tmpQueueName);
- final Connection connection = getConnection();
- try
- {
- connection.start();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer tmpQueueProducer = session.createProducer(tmpQueue);
- tmpQueueProducer.send(nextMessage(0, session));
- session.commit();
- return getQueueDepthBytes(tmpQueueName);
- }
- finally
- {
- connection.close();
- }
- }
-
private class MessageSender implements Runnable
{
private final AtomicInteger _sentMessages;
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/RejectOverflowPolicyTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/RejectOverflowPolicyTest.java
new file mode 100644
index 0000000..770f6d0
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/RejectOverflowPolicyTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.queue;
+
+import static junit.framework.TestCase.fail;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.OverflowPolicy;
+
+public class RejectOverflowPolicyTest extends OverflowPolicyTestBase
+{
+
+ @Test
+ public void testMaximumQueueDepthBytesExceeded() throws Exception
+ {
+ final int messageSize = evaluateMessageSize();
+ final int maximumQueueDepthBytes = messageSize + messageSize / 2;
+ final Queue queue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.REJECT, maximumQueueDepthBytes, -1, -1);
+ verifyOverflowPolicyRejectingSecondMessage(queue);
+ }
+
+ @Test
+ public void testMaximumQueueDepthMessagesExceeded() throws Exception
+ {
+ final Queue queue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.REJECT, -1, 1, -1);
+ verifyOverflowPolicyRejectingSecondMessage(queue);
+ }
+
+ private void verifyOverflowPolicyRejectingSecondMessage(final Queue queue) throws NamingException, JMSException
+ {
+ final Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer = producerSession.createProducer(queue);
+ final Message firstMessage = nextMessage(0, producerSession);
+ final Message secondMessage = nextMessage(1, producerSession);
+
+ producer.send(firstMessage);
+ try
+ {
+ producer.send(secondMessage);
+ fail("Message send should fail due to reject policy");
+ }
+ catch (JMSException e)
+ {
+ // pass
+ }
+
+ final Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message);
+ assertEquals(0, message.getIntProperty(INDEX));
+
+ consumerSession.commit();
+
+ producer.send(secondMessage);
+
+ Message message2 = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message2);
+ assertEquals(1, message2.getIntProperty(INDEX));
+
+ consumerSession.commit();
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+ finally
+ {
+ producerConnection.close();
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org