You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/05 18:06:55 UTC
svn commit: r1683790 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/logging/messages/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/protocol/
broker-core/src/main/java/...
Author: rgodfrey
Date: Fri Jun 5 16:06:55 2015
New Revision: 1683790
URL: http://svn.apache.org/r1683790
Log:
QPID-6575 : Log subscription state change message (SUB-1003) only when a subscription has been suspended for an unusally long period of time
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java (with props)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
qpid/java/trunk/test-profiles/Java010Excludes
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java Fri Jun 5 16:06:55 2015
@@ -61,12 +61,12 @@ public class SubscriptionMessages
/**
* Log a Subscription message of the Format:
- * <pre>SUB-1003 : State : {0}</pre>
+ * <pre>SUB-1003 : Suspended for {0,number} ms</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage STATE(String param1)
+ public static LogMessage STATE(Number param1)
{
String rawMessage = _messages.getString("STATE");
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties Fri Jun 5 16:06:55 2015
@@ -21,4 +21,4 @@
CREATE = SUB-1001 : Create[ : Durable][ : Arguments : {0}]
CLOSE = SUB-1002 : Close
# 0 - The current subscription state
-STATE = SUB-1003 : State : {0}
\ No newline at end of file
+STATE = SUB-1003 : Suspended for {0,number} ms
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Fri Jun 5 16:06:55 2015
@@ -29,6 +29,11 @@ public interface Consumer<X extends Cons
public String SELECTOR = "selector";
public String SETTLEMENT_MODE = "settlementMode";
+ String SUSPEND_NOTIFICATION_PERIOD = "consumer.suspendNotificationPeriod";
+
+ @ManagedContextDefault( name = SUSPEND_NOTIFICATION_PERIOD)
+ long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;
+
@ManagedAttribute
String getDistributionMode();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Jun 5 16:06:55 2015
@@ -109,4 +109,6 @@ public interface AMQConnectionModel<T ex
boolean isMessageAssignmentSuspended();
+ ServerProtocolEngine getProtocolEngine();
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Jun 5 16:06:55 2015
@@ -30,6 +30,7 @@ import org.apache.qpid.server.model.Cons
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.transport.network.Ticker;
/**
* Session model interface.
@@ -117,4 +118,7 @@ public interface AMQSessionModel<T exten
void transportStateChanged();
void processPending();
+
+ void addTicker(Ticker ticker);
+ void removeTicker(Ticker ticker);
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Jun 5 16:06:55 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.State;
@@ -51,6 +52,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.transport.network.AggregateTicker;
class QueueConsumerImpl
extends AbstractConfiguredObject<QueueConsumerImpl>
@@ -72,6 +74,8 @@ class QueueConsumerImpl
private final Object _sessionReference;
private final AbstractQueue _queue;
+ private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
+
static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
@@ -89,7 +93,7 @@ class QueueConsumerImpl
{
public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
{
- getEventLogger().message(QueueConsumerImpl.this, SubscriptionMessages.STATE(newState.toString()));
+ // no-op
}
};
@ManagedAttributeField
@@ -103,7 +107,7 @@ class QueueConsumerImpl
@ManagedAttributeField
private String _selector;
- QueueConsumerImpl(final AbstractQueue queue,
+ QueueConsumerImpl(final AbstractQueue<?> queue,
ConsumerTarget target, final String consumerName,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
@@ -139,6 +143,15 @@ class QueueConsumerImpl
}
};
_target.addStateListener(_listener);
+
+ _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+ {
+ @Override
+ protected void log(final long period)
+ {
+ getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
+ }
+ };
}
@Override
@@ -185,9 +198,16 @@ class QueueConsumerImpl
getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
}
}
+
+
+ if(newState == ConsumerTarget.State.SUSPENDED)
+ {
+ _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+ getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+ }
else
{
- getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(newState.toString()));
+ getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
}
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java?rev=1683790&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java Fri Jun 5 16:06:55 2015
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.transport.network.Ticker;
+
+abstract public class SuspendedConsumerLoggingTicker implements Ticker
+{
+ private long _nextTick;
+ private long _startTime;
+ private final long _repeatPeriod;
+
+ public SuspendedConsumerLoggingTicker(final long repeatPeriod)
+ {
+ _repeatPeriod = repeatPeriod;
+ }
+
+ public void setStartTime(final long currentTime)
+ {
+ _startTime = currentTime;
+ _nextTick = currentTime + _repeatPeriod;
+ }
+
+ @Override
+ public int getTimeToNextTick(final long currentTime)
+ {
+ return (int) (_nextTick - currentTime);
+ }
+
+ @Override
+ public int tick(final long currentTime)
+ {
+ int nextTick = getTimeToNextTick(currentTime);
+ if(nextTick <= 0)
+ {
+ log(currentTime - _startTime);
+ _nextTick = _nextTick + _repeatPeriod;
+ nextTick = getTimeToNextTick(currentTime);
+ }
+ return nextTick;
+ }
+
+ abstract protected void log(long period);
+}
Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Fri Jun 5 16:06:55 2015
@@ -53,11 +53,13 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.transport.network.Ticker;
public class MockConsumer implements ConsumerTarget
{
@@ -507,6 +509,18 @@ public class MockConsumer implements Con
{
}
+
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+
+ }
+
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+
+ }
}
private static class MockConnectionModel implements AMQConnectionModel
@@ -645,6 +659,12 @@ public class MockConsumer implements Con
}
@Override
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return null;
+ }
+
+ @Override
public String getClientVersion()
{
return null;
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java Fri Jun 5 16:06:55 2015
@@ -72,15 +72,5 @@ public class ConsumerMessagesTest extend
validateLogMessage(log, "SUB-1002", expected);
}
- public void testSubscriptionState()
- {
- String state = "ACTIVE";
- _logMessage = SubscriptionMessages.STATE(state);
- List<Object> log = performLog();
-
- String[] expected = {"State :", state};
-
- validateLogMessage(log, "SUB-1003", expected);
- }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Jun 5 16:06:55 2015
@@ -207,6 +207,7 @@ public class ServerConnection extends Co
super.setConnectionDelegate(delegate);
}
+ @Override
public ServerProtocolEngine getProtocolEngine()
{
return _serverProtocolEngine;
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Jun 5 16:06:55 2015
@@ -109,6 +109,7 @@ import org.apache.qpid.transport.RangeSe
import org.apache.qpid.transport.Session;
import org.apache.qpid.transport.SessionDelegate;
import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.network.Ticker;
public class ServerSession extends Session
implements AuthorizationHolder,
@@ -1161,6 +1162,18 @@ public class ServerSession extends Sessi
}
}
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+ getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+ }
+
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+ getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+ }
+
public final long getMaxUncommittedInMemorySize()
{
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Jun 5 16:06:55 2015
@@ -114,6 +114,7 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
public class AMQChannel
implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
@@ -3678,4 +3679,16 @@ public class AMQChannel
target.processPending();
}
}
+
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+ getConnection().getAggregateTicker().addTicker(ticker);
+ }
+
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+ getConnection().getAggregateTicker().removeTicker(ticker);
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Jun 5 16:06:55 2015
@@ -272,6 +272,12 @@ public class AMQProtocolEngine implement
}
@Override
+ public ServerProtocolEngine getProtocolEngine()
+ {
+ return this;
+ }
+
+ @Override
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Jun 5 16:06:55 2015
@@ -394,6 +394,7 @@ public class Connection_1_0 implements C
return _port;
}
+ @Override
public ServerProtocolEngine getProtocolEngine()
{
return _protocolEngine;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Jun 5 16:06:55 2015
@@ -90,6 +90,7 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.network.Ticker;
public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
{
@@ -913,6 +914,18 @@ public class Session_1_0 implements Sess
}
}
+ @Override
+ public void addTicker(final Ticker ticker)
+ {
+ getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+ }
+
+ @Override
+ public void removeTicker(final Ticker ticker)
+ {
+ getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java Fri Jun 5 16:06:55 2015
@@ -34,6 +34,7 @@ import javax.jms.Topic;
import junit.framework.AssertionFailedError;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.server.model.Consumer;
/**
* Subscription
@@ -58,6 +59,7 @@ public class ConsumerLoggingTest extends
@Override
public void setUp() throws Exception
{
+ setSystemProperty(Consumer.SUSPEND_NOTIFICATION_PERIOD, "100");
super.setUp();
//Remove broker startup logging messages
_monitor.markDiscardPoint();
@@ -319,89 +321,26 @@ public class ConsumerLoggingTest extends
int SEND_COUNT = 16;
sendMessage(_session, _queue, SEND_COUNT);
_session.commit();
+
+ Thread.sleep(2000l);
+
+ List<String> results = waitAndFindMatches("SUB-1003");
+
+ assertTrue("Expected at least two suspension messages, but got " + results.size(), results.size() >= 2);
+
// Retreive the first message, and start the flow of messages
Message msg = consumer.receive(1000);
- assertNotNull("First message not retrieved", msg);
+ assertNotNull("Message not retrieved", msg);
_session.commit();
-
- // Drain the queue to ensure there is time for the ACTIVE log message
- // Check that we can received all the messages
- int receivedCount = 0;
- while (msg != null)
- {
- receivedCount++;
- msg = consumer.receive(1000);
- _session.commit();
- }
-
- //Validate we received all the messages
- assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount);
-
- // Fill the queue again to suspend the consumer
- sendMessage(_session, _queue, SEND_COUNT);
+ msg = consumer.receive(1000);
+ assertNotNull("Message not retrieved", msg);
_session.commit();
- //Validate
- List<String> results = waitAndFindMatches("SUB-1003");
+ int count = waitAndFindMatches("SUB-1003").size();
+ Thread.sleep(2000l);
+ assertEquals("More suspension messages were received unexpectedly", count, waitAndFindMatches("SUB-1003").size());
- try
- {
- // Validation expects three messages.
- // The Actor can be any one of the following depending on the exactly what is going on on the broker.
- // Ideally we would test that we can get all of them but setting up
- // the timing to do this in a consistent way is not beneficial.
- // Ensuring the State is as expected is sufficient.
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State :
-// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State :
-// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State :
-
- assertTrue("Result set not expected size:", 3 <= results.size());
-
- // Validate Initial Suspension
- String expectedState = "SUSPENDED";
- String log = getLogMessage(results, 0);
- validateSubscriptionState(log, expectedState);
-
- // After being suspended the subscription should become active.
- expectedState = "ACTIVE";
- log = getLogMessage(results, 1);
- validateSubscriptionState(log, expectedState);
-
- // Validate that it was re-suspended
- expectedState = "SUSPENDED";
- log = getLogMessage(results, 2);
- validateSubscriptionState(log, expectedState);
- // We only need validate the state.
- }
- catch (AssertionFailedError afe)
- {
- System.err.println("Log Dump:");
- for (String log : results)
- {
- System.err.println(log);
- }
- throw afe;
- }
_connection.close();
}
- /**
- * Validate that the given log statement is a well formatted SUB-1003
- * message. That means the ID and expected state are correct.
- *
- * @param log the log to test
- * @param expectedState the state that should be logged.
- */
- private void validateSubscriptionState(String log, String expectedState)
- {
- validateMessageID("SUB-1003", log);
- String logMessage = getMessageString(fromMessage(log));
- assertTrue("Log Message does not start with 'State'" + logMessage,
- logMessage.startsWith("State"));
-
- assertTrue("Log Message does not have expected State of '"
- + expectedState + "'" + logMessage,
- logMessage.endsWith(expectedState));
- }
-
}
Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Fri Jun 5 16:06:55 2015
@@ -33,7 +33,6 @@ org.apache.qpid.test.unit.topic.DurableS
// 0-10 and 0-9 connections dont generate the exact same logging due to protocol differences
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped
org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted
-org.apache.qpid.server.logging.ConsumerLoggingTest#testSubscriptionSuspend
// 0-10 is not supported by the MethodRegistry
org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org