You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/08/11 14:57:36 UTC
qpid-broker-j git commit: QPID-7815: [Java Broker] Invoke overflow
policy check on queue maximum queue depth changes
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 17e6c7d6e -> b7ee49ded
QPID-7815: [Java Broker] Invoke overflow policy check on queue maximum queue depth changes
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b7ee49de
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b7ee49de
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b7ee49de
Branch: refs/heads/master
Commit: b7ee49ded4df66ecbb7b4836c599471659358e7e
Parents: 17e6c7d
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Aug 11 15:57:05 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Aug 11 15:57:30 2017 +0100
----------------------------------------------------------------------
.../queue/FlowToDiskOverflowPolicyHandler.java | 44 ++------
...owPolicyMaximumQueueDepthChangeListener.java | 76 +++++++++++++
.../server/queue/RingOverflowPolicyHandler.java | 106 +++++++++++--------
3 files changed, 148 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
index 9a6b354..6115408 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/FlowToDiskOverflowPolicyHandler.java
@@ -21,8 +21,6 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.MessageDeletedException;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
-import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
@@ -43,16 +41,22 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
}
- private static class Handler extends AbstractConfigurationChangeListener
+ private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
{
private final Queue<?> _queue;
- private boolean _limitsChanged;
private Handler(final Queue<?> queue)
{
+ super(OverflowPolicy.FLOW_TO_DISK);
_queue = queue;
}
+ @Override
+ void onMaximumQueueDepthChange(final Queue<?> queue)
+ {
+ checkOverflow(null);
+ }
+
private void checkOverflow(final QueueEntry newlyEnqueued)
{
long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
@@ -70,38 +74,6 @@ public class FlowToDiskOverflowPolicyHandler implements OverflowPolicyHandler
}
}
- @Override
- public void attributeSet(final ConfiguredObject<?> object,
- final String attributeName,
- final Object oldAttributeValue,
- final Object newAttributeValue)
- {
- super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
- if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
- || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
- {
- _limitsChanged = true;
- }
- }
-
- @Override
- public void bulkChangeEnd(final ConfiguredObject<?> object)
- {
- super.bulkChangeEnd(object);
- if (_queue.getOverflowPolicy() == OverflowPolicy.FLOW_TO_DISK)
- {
- if (_limitsChanged)
- {
- _limitsChanged = false;
- flowTailToDiskIfNecessary(_queue.getMaximumQueueDepthBytes(), _queue.getMaximumQueueDepthMessages());
- }
- }
- else
- {
- _queue.removeChangeListener(this);
- }
- }
-
private void flowTailToDiskIfNecessary(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
{
final long queueDepthBytes = _queue.getQueueDepthBytes();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java
new file mode 100644
index 0000000..1651cef
--- /dev/null
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyMaximumQueueDepthChangeListener.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+
+abstract class OverflowPolicyMaximumQueueDepthChangeListener extends AbstractConfigurationChangeListener
+{
+ private final OverflowPolicy _overflowPolicy;
+ private boolean _maximumQueueDepthChangeDetected;
+
+ OverflowPolicyMaximumQueueDepthChangeListener(final OverflowPolicy overflowPolicy)
+ {
+ _overflowPolicy = overflowPolicy;
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
+ if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
+ || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
+ {
+ _maximumQueueDepthChangeDetected = true;
+ }
+ }
+
+ @Override
+ public void bulkChangeEnd(final ConfiguredObject<?> object)
+ {
+ super.bulkChangeEnd(object);
+ if (object instanceof Queue)
+ {
+ Queue<?> queue = (Queue<?>) object;
+
+ if (queue.getOverflowPolicy() == _overflowPolicy)
+ {
+ if (_maximumQueueDepthChangeDetected)
+ {
+ _maximumQueueDepthChangeDetected = false;
+ onMaximumQueueDepthChange(queue);
+ }
+ }
+ else
+ {
+ queue.removeChangeListener(this);
+ }
+ }
+ }
+
+ abstract void onMaximumQueueDepthChange(final Queue<?> queue);
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b7ee49de/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
index 99e1c2e..d1da30b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
@@ -21,71 +21,93 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
public class RingOverflowPolicyHandler implements OverflowPolicyHandler
{
- private final Queue<?> _queue;
- private final EventLogger _eventLogger;
+ private final Handler _handler;
RingOverflowPolicyHandler(final Queue<?> queue,
final EventLogger eventLogger)
{
- _queue = queue;
- _eventLogger = eventLogger;
+ _handler = new Handler(queue, eventLogger);
+ queue.addChangeListener(_handler);
}
@Override
public void checkOverflow(final QueueEntry newlyEnqueued)
{
- final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
- final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
-
- boolean bytesOverflow, messagesOverflow, overflow = false;
- int counter = 0;
- int queueDepthMessages;
- long queueDepthBytes;
- do
+ _handler.checkOverflow();
+ }
+
+ private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
+ {
+ private final Queue<?> _queue;
+ private final EventLogger _eventLogger;
+
+ public Handler(final Queue<?> queue, final EventLogger eventLogger)
+ {
+ super(OverflowPolicy.RING);
+ _queue = queue;
+ _eventLogger = eventLogger;
+ }
+
+ @Override
+ void onMaximumQueueDepthChange(final Queue<?> queue)
{
- queueDepthMessages = _queue.getQueueDepthMessages();
- queueDepthBytes = _queue.getQueueDepthBytes();
+ checkOverflow();
+ }
- messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages;
- bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes;
+ private void checkOverflow()
+ {
+ final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+ final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
- if (bytesOverflow || messagesOverflow)
+ boolean bytesOverflow, messagesOverflow, overflow = false;
+ int counter = 0;
+ int queueDepthMessages;
+ long queueDepthBytes;
+ do
{
- if (!overflow)
- {
- overflow = true;
- }
+ queueDepthMessages = _queue.getQueueDepthMessages();
+ queueDepthBytes = _queue.getQueueDepthBytes();
- QueueEntry entry = _queue.getLeastSignificantOldestEntry();
+ messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages;
+ bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes;
- if (entry != null)
- {
- counter++;
- _queue.deleteEntry(entry);
- }
- else
+ if (bytesOverflow || messagesOverflow)
{
- queueDepthMessages = _queue.getQueueDepthMessages();
- queueDepthBytes = _queue.getQueueDepthBytes();
- break;
+ if (!overflow)
+ {
+ overflow = true;
+ }
+
+ QueueEntry entry = _queue.getLeastSignificantOldestEntry();
+
+ if (entry != null)
+ {
+ counter++;
+ _queue.deleteEntry(entry);
+ }
+ else
+ {
+ queueDepthMessages = _queue.getQueueDepthMessages();
+ queueDepthBytes = _queue.getQueueDepthBytes();
+ break;
+ }
}
}
- }
- while (bytesOverflow || messagesOverflow);
+ while (bytesOverflow || messagesOverflow);
- if (overflow)
- {
- _eventLogger.message(_queue.getLogSubject(),
- QueueMessages.DROPPED(
- counter,
- queueDepthBytes,
- queueDepthMessages,
- maximumQueueDepthBytes,
- maximumQueueDepthMessages));
+ if (overflow)
+ {
+ _eventLogger.message(_queue.getLogSubject(), QueueMessages.DROPPED(counter,
+ queueDepthBytes,
+ queueDepthMessages,
+ maximumQueueDepthBytes,
+ maximumQueueDepthMessages));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org