You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/05 18:06:55 UTC

svn commit: r1683790 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/logging/messages/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/...

Author: rgodfrey
Date: Fri Jun  5 16:06:55 2015
New Revision: 1683790

URL: http://svn.apache.org/r1683790
Log:
QPID-6575 : Log subscription state change message (SUB-1003) only when a subscription has been suspended for an unusally long period of time

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java   (with props)
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
    qpid/java/trunk/test-profiles/Java010Excludes

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/SubscriptionMessages.java Fri Jun  5 16:06:55 2015
@@ -61,12 +61,12 @@ public class SubscriptionMessages
 
     /**
      * Log a Subscription message of the Format:
-     * <pre>SUB-1003 : State : {0}</pre>
+     * <pre>SUB-1003 : Suspended for {0,number} ms</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage STATE(String param1)
+    public static LogMessage STATE(Number param1)
     {
         String rawMessage = _messages.getString("STATE");
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Subscription_logmessages.properties Fri Jun  5 16:06:55 2015
@@ -21,4 +21,4 @@
 CREATE = SUB-1001 : Create[ : Durable][ : Arguments : {0}]
 CLOSE = SUB-1002 : Close
 # 0 - The current subscription state
-STATE = SUB-1003 : State : {0}
\ No newline at end of file
+STATE = SUB-1003 : Suspended for {0,number} ms

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java Fri Jun  5 16:06:55 2015
@@ -29,6 +29,11 @@ public interface Consumer<X extends Cons
     public String SELECTOR = "selector";
     public String SETTLEMENT_MODE = "settlementMode";
 
+    String SUSPEND_NOTIFICATION_PERIOD = "consumer.suspendNotificationPeriod";
+
+    @ManagedContextDefault( name = SUSPEND_NOTIFICATION_PERIOD)
+    long SUSPEND_NOTIFICATION_PERIOD_DEFAULT = 10000;
+
     @ManagedAttribute
     String getDistributionMode();
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Jun  5 16:06:55 2015
@@ -109,4 +109,6 @@ public interface AMQConnectionModel<T ex
 
     boolean isMessageAssignmentSuspended();
 
+    ServerProtocolEngine getProtocolEngine();
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Jun  5 16:06:55 2015
@@ -30,6 +30,7 @@ import org.apache.qpid.server.model.Cons
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.util.Deletable;
+import org.apache.qpid.transport.network.Ticker;
 
 /**
  * Session model interface.
@@ -117,4 +118,7 @@ public interface AMQSessionModel<T exten
     void transportStateChanged();
 
     void processPending();
+
+    void addTicker(Ticker ticker);
+    void removeTicker(Ticker ticker);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Fri Jun  5 16:06:55 2015
@@ -44,6 +44,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.ManagedAttributeField;
 import org.apache.qpid.server.model.State;
@@ -51,6 +52,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.MessageConverterRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.transport.network.AggregateTicker;
 
 class QueueConsumerImpl
     extends AbstractConfiguredObject<QueueConsumerImpl>
@@ -72,6 +74,8 @@ class QueueConsumerImpl
     private final Object _sessionReference;
     private final AbstractQueue _queue;
 
+    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
+
     static final EnumMap<ConsumerTarget.State, State> STATE_MAP =
             new EnumMap<ConsumerTarget.State, State>(ConsumerTarget.State.class);
 
@@ -89,7 +93,7 @@ class QueueConsumerImpl
     {
         public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
         {
-            getEventLogger().message(QueueConsumerImpl.this, SubscriptionMessages.STATE(newState.toString()));
+            // no-op
         }
     };
     @ManagedAttributeField
@@ -103,7 +107,7 @@ class QueueConsumerImpl
     @ManagedAttributeField
     private String _selector;
 
-    QueueConsumerImpl(final AbstractQueue queue,
+    QueueConsumerImpl(final AbstractQueue<?> queue,
                       ConsumerTarget target, final String consumerName,
                       final FilterManager filters,
                       final Class<? extends ServerMessage> messageClass,
@@ -139,6 +143,15 @@ class QueueConsumerImpl
             }
         };
         _target.addStateListener(_listener);
+
+        _suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(queue.getContextValue(Long.class, Consumer.SUSPEND_NOTIFICATION_PERIOD))
+        {
+            @Override
+            protected void log(final long period)
+            {
+                getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(period));
+            }
+        };
     }
 
     @Override
@@ -185,9 +198,16 @@ class QueueConsumerImpl
                     getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
                 }
             }
+
+
+            if(newState == ConsumerTarget.State.SUSPENDED)
+            {
+                _suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
+                getSessionModel().addTicker(_suspendedConsumerLoggingTicker);
+            }
             else
             {
-                getEventLogger().message(getLogSubject(), SubscriptionMessages.STATE(newState.toString()));
+                getSessionModel().removeTicker(_suspendedConsumerLoggingTicker);
             }
         }
 

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java?rev=1683790&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java Fri Jun  5 16:06:55 2015
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.transport.network.Ticker;
+
+abstract public class SuspendedConsumerLoggingTicker implements Ticker
+{
+    private long _nextTick;
+    private long _startTime;
+    private final long _repeatPeriod;
+
+    public SuspendedConsumerLoggingTicker(final long repeatPeriod)
+    {
+        _repeatPeriod = repeatPeriod;
+    }
+
+    public void setStartTime(final long currentTime)
+    {
+        _startTime = currentTime;
+        _nextTick = currentTime + _repeatPeriod;
+    }
+
+    @Override
+    public int getTimeToNextTick(final long currentTime)
+    {
+        return (int) (_nextTick - currentTime);
+    }
+
+    @Override
+    public int tick(final long currentTime)
+    {
+        int nextTick = getTimeToNextTick(currentTime);
+        if(nextTick <= 0)
+        {
+            log(currentTime - _startTime);
+            _nextTick = _nextTick + _repeatPeriod;
+            nextTick = getTimeToNextTick(currentTime);
+        }
+        return nextTick;
+    }
+
+    abstract protected void log(long period);
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java Fri Jun  5 16:06:55 2015
@@ -53,11 +53,13 @@ import org.apache.qpid.server.model.port
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConsumerListener;
+import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.protocol.SessionModelListener;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.transport.network.Ticker;
 
 public class MockConsumer implements ConsumerTarget
 {
@@ -507,6 +509,18 @@ public class MockConsumer implements Con
         {
 
         }
+
+        @Override
+        public void addTicker(final Ticker ticker)
+        {
+
+        }
+
+        @Override
+        public void removeTicker(final Ticker ticker)
+        {
+
+        }
     }
 
     private static class MockConnectionModel implements AMQConnectionModel
@@ -645,6 +659,12 @@ public class MockConsumer implements Con
         }
 
         @Override
+        public ServerProtocolEngine getProtocolEngine()
+        {
+            return null;
+        }
+
+        @Override
         public String getClientVersion()
         {
             return null;

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java Fri Jun  5 16:06:55 2015
@@ -72,15 +72,5 @@ public class ConsumerMessagesTest extend
         validateLogMessage(log, "SUB-1002", expected);
     }
 
-    public void testSubscriptionState()
-    {
-        String state = "ACTIVE";
 
-        _logMessage = SubscriptionMessages.STATE(state);
-        List<Object> log = performLog();
-
-        String[] expected = {"State :", state};
-
-        validateLogMessage(log, "SUB-1003", expected);
-    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Jun  5 16:06:55 2015
@@ -207,6 +207,7 @@ public class ServerConnection extends Co
         super.setConnectionDelegate(delegate);
     }
 
+    @Override
     public ServerProtocolEngine getProtocolEngine()
     {
         return _serverProtocolEngine;

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Jun  5 16:06:55 2015
@@ -109,6 +109,7 @@ import org.apache.qpid.transport.RangeSe
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
 import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.network.Ticker;
 
 public class ServerSession extends Session
         implements AuthorizationHolder,
@@ -1161,6 +1162,18 @@ public class ServerSession extends Sessi
         }
     }
 
+    @Override
+    public void addTicker(final Ticker ticker)
+    {
+        getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+    }
+
+    @Override
+    public void removeTicker(final Ticker ticker)
+    {
+        getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+    }
+
 
     public final long getMaxUncommittedInMemorySize()
     {

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Jun  5 16:06:55 2015
@@ -114,6 +114,7 @@ import org.apache.qpid.server.virtualhos
 import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
 
 public class AMQChannel
         implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
@@ -3678,4 +3679,16 @@ public class AMQChannel
             target.processPending();
         }
     }
+
+    @Override
+    public void addTicker(final Ticker ticker)
+    {
+        getConnection().getAggregateTicker().addTicker(ticker);
+    }
+
+    @Override
+    public void removeTicker(final Ticker ticker)
+    {
+        getConnection().getAggregateTicker().removeTicker(ticker);
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Jun  5 16:06:55 2015
@@ -272,6 +272,12 @@ public class AMQProtocolEngine implement
     }
 
     @Override
+    public ServerProtocolEngine getProtocolEngine()
+    {
+        return this;
+    }
+
+    @Override
     public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
     {
         _messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Jun  5 16:06:55 2015
@@ -394,6 +394,7 @@ public class Connection_1_0 implements C
         return _port;
     }
 
+    @Override
     public ServerProtocolEngine getProtocolEngine()
     {
         return _protocolEngine;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Jun  5 16:06:55 2015
@@ -90,6 +90,7 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.QueueExistsException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.transport.network.Ticker;
 
 public class Session_1_0 implements SessionEventListener, AMQSessionModel<Session_1_0, Connection_1_0>, LogSubject
 {
@@ -913,6 +914,18 @@ public class Session_1_0 implements Sess
         }
     }
 
+    @Override
+    public void addTicker(final Ticker ticker)
+    {
+        getConnection().getProtocolEngine().getAggregateTicker().addTicker(ticker);
+    }
+
+    @Override
+    public void removeTicker(final Ticker ticker)
+    {
+        getConnection().getProtocolEngine().getAggregateTicker().removeTicker(ticker);
+    }
+
     private void consumerAdded(Consumer<?> consumer)
     {
         for(ConsumerListener l : _consumerListeners)

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java Fri Jun  5 16:06:55 2015
@@ -34,6 +34,7 @@ import javax.jms.Topic;
 import junit.framework.AssertionFailedError;
 
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.server.model.Consumer;
 
 /**
  * Subscription
@@ -58,6 +59,7 @@ public class ConsumerLoggingTest extends
     @Override
     public void setUp() throws Exception
     {
+        setSystemProperty(Consumer.SUSPEND_NOTIFICATION_PERIOD, "100");
         super.setUp();
         //Remove broker startup logging messages
         _monitor.markDiscardPoint();
@@ -319,89 +321,26 @@ public class ConsumerLoggingTest extends
         int SEND_COUNT = 16;
         sendMessage(_session, _queue, SEND_COUNT);
         _session.commit();
+
+        Thread.sleep(2000l);
+
+        List<String> results = waitAndFindMatches("SUB-1003");
+
+        assertTrue("Expected at least two suspension messages, but got " + results.size(), results.size() >= 2);
+
         // Retreive the first message, and start the flow of messages
         Message msg = consumer.receive(1000);
-        assertNotNull("First message not retrieved", msg);
+        assertNotNull("Message not retrieved", msg);
         _session.commit();
-        
-        // Drain the queue to ensure there is time for the ACTIVE log message
-        // Check that we can received all the messages
-        int receivedCount = 0;
-        while (msg != null)
-        {
-            receivedCount++;
-            msg = consumer.receive(1000);
-            _session.commit();
-        }
-
-        //Validate we received all the messages
-        assertEquals("Not all sent messages received.", SEND_COUNT, receivedCount);
-
-        // Fill the queue again to suspend the consumer
-        sendMessage(_session, _queue, SEND_COUNT);
+        msg = consumer.receive(1000);
+        assertNotNull("Message not retrieved", msg);
         _session.commit();
 
-        //Validate
-        List<String> results = waitAndFindMatches("SUB-1003");
+        int count = waitAndFindMatches("SUB-1003").size();
+        Thread.sleep(2000l);
+        assertEquals("More suspension messages were received unexpectedly", count, waitAndFindMatches("SUB-1003").size());
 
-        try
-        {
-            // Validation expects three messages.
-            // The Actor can be any one of the following depending on the exactly what is going on on the broker.
-            // Ideally we would test that we can get all of them but setting up
-            // the timing to do this in a consistent way is not beneficial.
-            // Ensuring the State is as expected is sufficient.
-// INFO - MESSAGE [vh(/test)/qu(example.queue)] [sub:6(qu(example.queue))] SUB-1003 : State :
-// INFO - MESSAGE [con:6(guest@anonymous(26562441)/test)/ch:3] [sub:6(qu(example.queue))] SUB-1003 : State :
-// INFO - MESSAGE [sub:6(vh(test)/qu(example.queue))] [sub:6(qu(example.queue))] SUB-1003 : State :
-
-            assertTrue("Result set not expected size:", 3 <= results.size());
-
-            // Validate Initial Suspension
-            String expectedState = "SUSPENDED";
-            String log = getLogMessage(results, 0);
-            validateSubscriptionState(log, expectedState);
-
-            // After being suspended the subscription should become active.
-            expectedState = "ACTIVE";
-            log = getLogMessage(results, 1);
-            validateSubscriptionState(log, expectedState);
-
-            // Validate that it was re-suspended
-            expectedState = "SUSPENDED";
-            log = getLogMessage(results, 2);
-            validateSubscriptionState(log, expectedState);
-            // We only need validate the state.
-        }
-        catch (AssertionFailedError afe)
-        {
-            System.err.println("Log Dump:");
-            for (String log : results)
-            {
-                System.err.println(log);
-            }
-            throw afe;
-        }
         _connection.close();
     }
 
-    /**
-     * Validate that the given log statement is a well formatted SUB-1003
-     * message. That means the ID and expected state are correct.
-     *
-     * @param log           the log to test
-     * @param expectedState the state that should be logged.
-     */
-    private void validateSubscriptionState(String log, String expectedState)
-    {
-        validateMessageID("SUB-1003", log);
-        String logMessage = getMessageString(fromMessage(log));
-        assertTrue("Log Message does not start with 'State'" + logMessage,
-                   logMessage.startsWith("State"));
-
-        assertTrue("Log Message does not have expected State of '"
-                   + expectedState + "'" + logMessage,
-                   logMessage.endsWith(expectedState));
-    }
-
 }

Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1683790&r1=1683789&r2=1683790&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Fri Jun  5 16:06:55 2015
@@ -33,7 +33,6 @@ org.apache.qpid.test.unit.topic.DurableS
 // 0-10 and 0-9 connections dont generate the exact same logging due to protocol differences
 org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartsFlowStopped
 org.apache.qpid.server.logging.ChannelLoggingTest#testChannelStartConsumerFlowStarted
-org.apache.qpid.server.logging.ConsumerLoggingTest#testSubscriptionSuspend
 
 // 0-10 is not supported by the MethodRegistry
 org.apache.qpid.test.unit.close.JavaServerCloseRaceConditionTest#*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org