You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/08/11 14:57:36 UTC

qpid-broker-j git commit: QPID-7815: [Java Broker] Invoke overflow policy check on queue maximum queue depth changes

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 17e6c7d6e -> b7ee49ded


QPID-7815: [Java Broker] Invoke overflow policy check on queue maximum queue depth changes


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b7ee49de
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b7ee49de
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b7ee49de

Branch: refs/heads/master
Commit: b7ee49ded4df66ecbb7b4836c599471659358e7e
Parents: 17e6c7d
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Aug 11 15:57:05 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Aug 11 15:57:30 2017 +0100

----------------------------------------------------------------------
 .../queue/FlowToDiskOverflowPolicyHandler.java  |  44 ++------
 ...owPolicyMaximumQueueDepthChangeListener.java |  76 +++++++++++++
 .../server/queue/RingOverflowPolicyHandler.java | 106 +++++++++++--------
 3 files changed, 148 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 9a6b354..6115408 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -21,8 +21,6 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.message.MessageDeletedException;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
 
@@ -43,16 +41,22 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
 
     }
 
-    private static class Handler extends AbstractConfigurationChangeListener
+    private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
     {
         private final Queue<?> _queue;
-        private boolean _limitsChanged;
 
         private Handler(final Queue<?> queue)
         {
+            super(OverflowPolicy.FLOW_TO_DISK);
             _queue = queue;
         }
 
+        @Override
+        void onMaximumQueueDepthChange(final Queue<?> queue)
+        {
+            checkOverflow(null);
+        }
+
         private void checkOverflow(final QueueEntry newlyEnqueued)
         {
             long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
@@ -70,38 +74,6 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
             }
         }
 
-        @Override
-        public void attributeSet(final ConfiguredObject<?> object,
-                                 final String attributeName,
-                                 final Object oldAttributeValue,
-                                 final Object newAttributeValue)
-        {
-            super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
-            if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
-                || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
-            {
-                _limitsChanged = true;
-            }
-        }
-
-        @Override
-        public void bulkChangeEnd(final ConfiguredObject<?> object)
-        {
-            super.bulkChangeEnd(object);
-            if (_queue.getOverflowPolicy() == OverflowPolicy.FLOW_TO_DISK)
-            {
-                if (_limitsChanged)
-                {
-                    _limitsChanged = false;
-                    flowTailToDiskIfNecessary(_queue.getMaximumQueueDepthBytes(), _queue.getMaximumQueueDepthMessages());
-                }
-            }
-            else
-            {
-                _queue.removeChangeListener(this);
-            }
-        }
-
         private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
         {
             final long queueDepthBytes = _queue.getQueueDepthBytes();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java
new file mode 100644
index 0000000..1651cef
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+
+abstract class OverflowPolicyMaximumQueueDepthChangeListener extends AbstractConfigurationChangeListener
+{
+    private final OverflowPolicy _overflowPolicy;
+    private boolean _maximumQueueDepthChangeDetected;
+
+    OverflowPolicyMaximumQueueDepthChangeListener(final OverflowPolicy overflowPolicy)
+    {
+        _overflowPolicy = overflowPolicy;
+    }
+
+    @Override
+    public void attributeSet(final ConfiguredObject<?> object,
+                             final String attributeName,
+                             final Object oldAttributeValue,
+                             final Object newAttributeValue)
+    {
+        super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
+        if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
+            || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
+        {
+            _maximumQueueDepthChangeDetected = true;
+        }
+    }
+
+    @Override
+    public void bulkChangeEnd(final ConfiguredObject<?> object)
+    {
+        super.bulkChangeEnd(object);
+        if (object instanceof Queue)
+        {
+            Queue<?> queue = (Queue<?>) object;
+
+            if (queue.getOverflowPolicy() == _overflowPolicy)
+            {
+                if (_maximumQueueDepthChangeDetected)
+                {
+                    _maximumQueueDepthChangeDetected = false;
+                    onMaximumQueueDepthChange(queue);
+                }
+            }
+            else
+            {
+                queue.removeChangeListener(this);
+            }
+        }
+    }
+
+    abstract void onMaximumQueueDepthChange(final Queue<?> queue);
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
index 99e1c2e..d1da30b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
@@ -21,71 +21,93 @@ package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
 
 public class RingOverflowPolicyHandler implements OverflowPolicyHandler
 {
-    private final Queue<?> _queue;
-    private final EventLogger _eventLogger;
+    private final Handler _handler;
 
     RingOverflowPolicyHandler(final Queue<?> queue,
                               final EventLogger eventLogger)
     {
-        _queue = queue;
-        _eventLogger = eventLogger;
+        _handler = new Handler(queue, eventLogger);
+        queue.addChangeListener(_handler);
     }
 
     @Override
     public void checkOverflow(final QueueEntry newlyEnqueued)
     {
-        final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
-        final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
-
-        boolean bytesOverflow, messagesOverflow, overflow = false;
-        int counter = 0;
-        int queueDepthMessages;
-        long queueDepthBytes;
-        do
+        _handler.checkOverflow();
+    }
+
+    private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
+    {
+        private final Queue<?> _queue;
+        private final EventLogger _eventLogger;
+
+        public Handler(final Queue<?> queue, final EventLogger eventLogger)
+        {
+            super(OverflowPolicy.RING);
+            _queue = queue;
+            _eventLogger = eventLogger;
+        }
+
+        @Override
+        void onMaximumQueueDepthChange(final Queue<?> queue)
         {
-            queueDepthMessages = _queue.getQueueDepthMessages();
-            queueDepthBytes = _queue.getQueueDepthBytes();
+            checkOverflow();
+        }
 
-            messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages;
-            bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes;
+        private void checkOverflow()
+        {
+            final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+            final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
 
-            if (bytesOverflow || messagesOverflow)
+            boolean bytesOverflow, messagesOverflow, overflow = false;
+            int counter = 0;
+            int queueDepthMessages;
+            long queueDepthBytes;
+            do
             {
-                if (!overflow)
-                {
-                    overflow = true;
-                }
+                queueDepthMessages = _queue.getQueueDepthMessages();
+                queueDepthBytes = _queue.getQueueDepthBytes();
 
-                QueueEntry entry = _queue.getLeastSignificantOldestEntry();
+                messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages;
+                bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes;
 
-                if (entry != null)
-                {
-                    counter++;
-                    _queue.deleteEntry(entry);
-                }
-                else
+                if (bytesOverflow || messagesOverflow)
                 {
-                    queueDepthMessages = _queue.getQueueDepthMessages();
-                    queueDepthBytes = _queue.getQueueDepthBytes();
-                    break;
+                    if (!overflow)
+                    {
+                        overflow = true;
+                    }
+
+                    QueueEntry entry = _queue.getLeastSignificantOldestEntry();
+
+                    if (entry != null)
+                    {
+                        counter++;
+                        _queue.deleteEntry(entry);
+                    }
+                    else
+                    {
+                        queueDepthMessages = _queue.getQueueDepthMessages();
+                        queueDepthBytes = _queue.getQueueDepthBytes();
+                        break;
+                    }
                 }
             }
-        }
-        while (bytesOverflow || messagesOverflow);
+            while (bytesOverflow || messagesOverflow);
 
-        if (overflow)
-        {
-            _eventLogger.message(_queue.getLogSubject(),
-                                 QueueMessages.DROPPED(
-                                         counter,
-                                         queueDepthBytes,
-                                         queueDepthMessages,
-                                         maximumQueueDepthBytes,
-                                         maximumQueueDepthMessages));
+            if (overflow)
+            {
+                _eventLogger.message(_queue.getLogSubject(), QueueMessages.DROPPED(counter,
+                                                                                   queueDepthBytes,
+                                                                                   queueDepthMessages,
+                                                                                   maximumQueueDepthBytes,
+                                                                                   maximumQueueDepthMessages));
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org