You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/12 16:21:57 UTC

[qpid-broker-j] branch 7.1.x updated: QPID-8341: [Broker-J] Fix reject overflow policy

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/7.1.x by this push:
     new 4dd26dd  QPID-8341: [Broker-J] Fix reject overflow policy
4dd26dd is described below

commit 4dd26ddd0263fac22d91a0a16c48789592a9ccb2
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Jul 12 16:05:28 2019 +0100

    QPID-8341: [Broker-J] Fix reject overflow policy
    
    (cherry picked from commit d7b55e8b991900074a6845c325a79bfad0549768)
---
 .../qpid/server/queue/RejectPolicyHandler.java     |  45 ++++----
 .../extensions/queue/OverflowPolicyTestBase.java   | 128 +++++++++++++++++++++
 .../extensions/queue/ProducerFlowControlTest.java  |  76 +-----------
 .../extensions/queue/RejectOverflowPolicyTest.java | 111 ++++++++++++++++++
 4 files changed, 264 insertions(+), 96 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
index d81e1d2..a3584af 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RejectPolicyHandler.java
@@ -109,30 +109,31 @@ public class RejectPolicyHandler
             final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
             final int queueDepthMessages = _queue.getQueueDepthMessages();
             final long queueDepthBytes = _queue.getQueueDepthBytes();
-
-            int pendingMessages = _pendingDepthMessages.addAndGet(1);
-            long pendingBytes = _pendingDepthBytes.addAndGet(newMessage.getSizeIncludingHeader());
-
-            boolean messagesOverflow = maximumQueueDepthMessages >= 0
-                                       && queueDepthMessages + pendingMessages > maximumQueueDepthMessages;
-            boolean bytesOverflow = maximumQueueDepthBytes >= 0
-                                    && queueDepthBytes + pendingBytes > maximumQueueDepthBytes;
-            if (bytesOverflow || messagesOverflow)
+            final long size = newMessage.getSizeIncludingHeader();
+            if (_pendingMessages.putIfAbsent(newMessage.getStoredMessage(), size) == null)
             {
-                final long depthBytesDelta = -newMessage.getSizeIncludingHeader();
-                _pendingDepthBytes.addAndGet(-depthBytesDelta);
-                _pendingDepthMessages.addAndGet(-1);
-                final String message = String.format(
-                        "Maximum depth exceeded on '%s' : current=[count: %d, size: %d], max=[count: %d, size: %d]",
-                        _queue.getName(),
-                        queueDepthMessages + pendingMessages,
-                        queueDepthBytes + pendingBytes,
-                        maximumQueueDepthMessages,
-                        maximumQueueDepthBytes);
-                throw new MessageUnacceptableException(message);
+                int pendingMessages = _pendingDepthMessages.addAndGet(1);
+                long pendingBytes = _pendingDepthBytes.addAndGet(size);
+
+                boolean messagesOverflow = maximumQueueDepthMessages >= 0
+                                           && queueDepthMessages + pendingMessages > maximumQueueDepthMessages;
+                boolean bytesOverflow = maximumQueueDepthBytes >= 0
+                                        && queueDepthBytes + pendingBytes > maximumQueueDepthBytes;
+                if (bytesOverflow || messagesOverflow)
+                {
+                    _pendingDepthBytes.addAndGet(-size);
+                    _pendingDepthMessages.addAndGet(-1);
+                    _pendingMessages.remove(newMessage.getStoredMessage());
+                    final String message = String.format(
+                            "Maximum depth exceeded on '%s' : current=[count: %d, size: %d], max=[count: %d, size: %d]",
+                            _queue.getName(),
+                            queueDepthMessages + pendingMessages,
+                            queueDepthBytes + pendingBytes,
+                            maximumQueueDepthMessages,
+                            maximumQueueDepthBytes);
+                    throw new MessageUnacceptableException(message);
+                }
             }
-
-            _pendingMessages.put(newMessage.getStoredMessage(), newMessage.getSizeIncludingHeader());
         }
 
         private void postEnqueue(MessageInstance instance)
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/OverflowPolicyTestBase.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/OverflowPolicyTestBase.java
new file mode 100644
index 0000000..2eea890
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/OverflowPolicyTestBase.java
@@ -0,0 +1,128 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.queue;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.systests.JmsTestBase;
+
+public class OverflowPolicyTestBase extends JmsTestBase
+{
+    private final byte[] BYTE_300 = new byte[300];
+
+
+    protected Queue createQueueWithOverflowPolicy(final String queueName,
+                                                  final OverflowPolicy overflowPolicy,
+                                                  final int maxQueueDepthBytes,
+                                                  final int maxQueueDepthMessages,
+                                                  final int resumeCapacity) throws Exception
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        if (maxQueueDepthBytes > 0)
+        {
+            attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, maxQueueDepthBytes);
+            if (resumeCapacity > 0)
+            {
+                String flowResumeLimit = getFlowResumeLimit(maxQueueDepthBytes, resumeCapacity);
+                attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
+                               String.format("{\"%s\": %s}",
+                                             org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
+                                             flowResumeLimit));
+            }
+        }
+        if (maxQueueDepthMessages > 0)
+        {
+            attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, maxQueueDepthMessages);
+        }
+        attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, overflowPolicy.name());
+        createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
+        return createQueue(queueName);
+    }
+
+    protected String getFlowResumeLimit(final double maximumCapacity, final double resumeCapacity)
+    {
+        double ratio = resumeCapacity / maximumCapacity;
+        return String.format("%.2f", ratio * 100.0);
+    }
+
+    protected int evaluateMessageSize() throws Exception
+    {
+        String tmpQueueName = getTestName() + "_Tmp";
+        Queue tmpQueue = createQueue(tmpQueueName);
+        final Connection connection = getConnection();
+        try
+        {
+            connection.start();
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageProducer tmpQueueProducer = session.createProducer(tmpQueue);
+            tmpQueueProducer.send(nextMessage(0, session));
+            session.commit();
+            return getQueueDepthBytes(tmpQueueName);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    protected int getQueueDepthBytes(final String queueName) throws Exception
+    {
+        return getStatistics(queueName, "queueDepthBytes").intValue();
+    }
+
+    protected Number getStatistics(final String queueName, final String statisticsName) throws Exception
+    {
+        Map<String, Object> arguments =
+                Collections.singletonMap("statistics", Collections.singletonList(statisticsName));
+        Object statistics = performOperationUsingAmqpManagement(queueName,
+                                                                "getStatistics",
+                                                                "org.apache.qpid.Queue",
+                                                                arguments);
+        assertNotNull("Statistics is null", statistics);
+        assertTrue("Statistics is not map", statistics instanceof Map);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
+        assertTrue(String.format("%s is not present", statisticsName),
+                   statisticsMap.get(statisticsName) instanceof Number);
+        return ((Number) statisticsMap.get(statisticsName));
+    }
+
+    protected Message nextMessage(int index, Session producerSession) throws JMSException
+    {
+        BytesMessage send = producerSession.createBytesMessage();
+        send.writeBytes(BYTE_300);
+        send.setIntProperty(INDEX, index);
+        return send;
+    }
+}
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
index a3fe680..918f833 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -33,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -45,9 +43,8 @@ import javax.jms.Session;
 import org.junit.Test;
 
 import org.apache.qpid.server.model.OverflowPolicy;
-import org.apache.qpid.systests.JmsTestBase;
 
-public class ProducerFlowControlTest extends JmsTestBase
+public class ProducerFlowControlTest extends OverflowPolicyTestBase
 {
 
     @Test
@@ -306,27 +303,6 @@ public class ProducerFlowControlTest extends JmsTestBase
         }
     }
 
-    private int getQueueDepthBytes(final String queueName) throws Exception
-    {
-        return getStatistics(queueName, "queueDepthBytes").intValue();
-    }
-
-    private Number getStatistics(final String queueName, final String statisticsName) throws Exception
-    {
-        Map<String, Object> arguments =
-                Collections.singletonMap("statistics", Collections.singletonList(statisticsName));
-        Object statistics = performOperationUsingAmqpManagement(queueName,
-                                                                "getStatistics",
-                                                                "org.apache.qpid.Queue",
-                                                                arguments);
-        assertNotNull("Statistics is null", statistics);
-        assertTrue("Statistics is not map", statistics instanceof Map);
-        @SuppressWarnings("unchecked")
-        Map<String, Object> statisticsMap = (Map<String, Object>) statistics;
-        assertTrue(String.format("%s is not present", statisticsName), statisticsMap.get(statisticsName) instanceof Number);
-        return ((Number) statisticsMap.get(statisticsName));
-    }
-
     private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception
     {
         final Map<String, Object> attributes = new HashMap<>();
@@ -340,12 +316,6 @@ public class ProducerFlowControlTest extends JmsTestBase
         updateEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
     }
 
-    private String getFlowResumeLimit(final double maximumCapacity, final double resumeCapacity)
-    {
-        double ratio = resumeCapacity / maximumCapacity;
-        return String.format("%.2f", ratio * 100.0);
-    }
-
     private boolean isFlowStopped(final String queueName) throws Exception
     {
         Map<String, Object> attributes = readEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", false);
@@ -358,19 +328,7 @@ public class ProducerFlowControlTest extends JmsTestBase
                                                            int resumeCapacity) throws Exception
     {
 
-        final Map<String, Object> attributes = new HashMap<>();
-        if (capacity != 0)
-        {
-            String flowResumeLimit = getFlowResumeLimit(capacity, resumeCapacity);
-            attributes.put(org.apache.qpid.server.model.Queue.CONTEXT,
-                           String.format("{\"%s\": %s}",
-                                         org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_LIMIT,
-                                         flowResumeLimit));
-        }
-        attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_QUEUE_DEPTH_BYTES, capacity);
-        attributes.put(org.apache.qpid.server.model.Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL.name());
-        createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", attributes);
-        return createQueue(queueName);
+       return createQueueWithOverflowPolicy(queueName, OverflowPolicy.PRODUCER_FLOW_CONTROL, capacity, -1, resumeCapacity);
     }
 
     private MessageSender sendMessagesAsync(final MessageProducer producer,
@@ -390,16 +348,6 @@ public class ProducerFlowControlTest extends JmsTestBase
         return sender;
     }
 
-    private final byte[] BYTE_300 = new byte[300];
-
-    private Message nextMessage(int msg, Session producerSession) throws JMSException
-    {
-        BytesMessage send = producerSession.createBytesMessage();
-        send.writeBytes(BYTE_300);
-        send.setIntProperty("msg", msg);
-        return send;
-    }
-
     private boolean awaitStatisticsValue(String queueName, String statisticsName, Number expectedValue, long timeout)
             throws Exception
     {
@@ -467,26 +415,6 @@ public class ProducerFlowControlTest extends JmsTestBase
         return found;
     }
 
-    private int evaluateMessageSize() throws Exception
-    {
-        String tmpQueueName = getTestName() + "_Tmp";
-        Queue tmpQueue = createQueue(tmpQueueName);
-        final Connection connection = getConnection();
-        try
-        {
-            connection.start();
-            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-            MessageProducer tmpQueueProducer = session.createProducer(tmpQueue);
-            tmpQueueProducer.send(nextMessage(0, session));
-            session.commit();
-            return getQueueDepthBytes(tmpQueueName);
-        }
-        finally
-        {
-            connection.close();
-        }
-    }
-
     private class MessageSender implements Runnable
     {
         private final AtomicInteger _sentMessages;
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/RejectOverflowPolicyTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/RejectOverflowPolicyTest.java
new file mode 100644
index 0000000..770f6d0
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/RejectOverflowPolicyTest.java
@@ -0,0 +1,111 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.queue;
+
+import static junit.framework.TestCase.fail;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.OverflowPolicy;
+
+public class RejectOverflowPolicyTest extends OverflowPolicyTestBase
+{
+
+    @Test
+    public void testMaximumQueueDepthBytesExceeded() throws Exception
+    {
+        final int messageSize = evaluateMessageSize();
+        final int maximumQueueDepthBytes = messageSize + messageSize / 2;
+        final Queue queue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.REJECT, maximumQueueDepthBytes, -1, -1);
+        verifyOverflowPolicyRejectingSecondMessage(queue);
+    }
+
+    @Test
+    public void testMaximumQueueDepthMessagesExceeded() throws Exception
+    {
+        final Queue queue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.REJECT, -1, 1, -1);
+        verifyOverflowPolicyRejectingSecondMessage(queue);
+    }
+
+    private void verifyOverflowPolicyRejectingSecondMessage(final Queue queue) throws NamingException, JMSException
+    {
+        final Connection producerConnection = getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = producerSession.createProducer(queue);
+            final Message firstMessage = nextMessage(0, producerSession);
+            final Message secondMessage = nextMessage(1, producerSession);
+
+            producer.send(firstMessage);
+            try
+            {
+                producer.send(secondMessage);
+                fail("Message send should fail due to reject policy");
+            }
+            catch (JMSException e)
+            {
+                // pass
+            }
+
+            final Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+                MessageConsumer consumer = consumerSession.createConsumer(queue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+                assertEquals(0, message.getIntProperty(INDEX));
+
+                consumerSession.commit();
+
+                producer.send(secondMessage);
+
+                Message message2 = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message2);
+                assertEquals(1, message2.getIntProperty(INDEX));
+
+                consumerSession.commit();
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection.close();
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org