You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 17:53:42 UTC

svn commit: r1632618 - in /qpid/trunk/qpid/java: broker-core/ broker-core/src/main/java/org/apache/qpid/server/logging/messages/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-c...

Author: rgodfrey
Date: Fri Oct 17 15:53:42 2014
New Revision: 1632618

URL: http://svn.apache.org/r1632618
Log:
QPID-6163 : [Java Broker] Disconnect clients which do not obey flow control

Modified:
    qpid/trunk/qpid/java/broker-core/build-generate-sources.xml
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/trunk/qpid/java/broker-core/build-generate-sources.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/build-generate-sources.xml?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/build-generate-sources.xml (original)
+++ qpid/trunk/qpid/java/broker-core/build-generate-sources.xml Fri Oct 17 15:53:42 2014
@@ -79,7 +79,7 @@
 
       <echo message="logmessages is ${logmessages}"/>
 
-      <java classname="org.apache.qpid.server.logging.GenerateLogMessages" fork="true" dir="${gentools.classes}" failonerror="true">
+      <java classname="org.apache.qpid.server.logging.GenerateLogMessages" fork="true" failonerror="true">
         <arg line="'${logmessages}'"/>
           <arg value="-j"/>
           <arg value="-o"/>

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java Fri Oct 17 15:53:42 2014
@@ -22,14 +22,15 @@ package org.apache.qpid.server.logging.m
 
 import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.logging.LogMessage;
-
 import java.text.MessageFormat;
 import java.util.Locale;
 import java.util.ResourceBundle;
 
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.logging.LogMessage;
+
 /**
  * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
  *
@@ -53,6 +54,7 @@ public class ChannelMessages
     public static final String DEADLETTERMSG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg";
     public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
     public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
+    public static final String FLOW_CONTROL_IGNORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_control_ignored";
     public static final String DISCARDMSG_NOROUTE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noroute";
     public static final String OPEN_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.open_txn";
     public static final String FLOW_REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_removed";
@@ -69,6 +71,7 @@ public class ChannelMessages
         Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY);
         Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
         Logger.getLogger(IDLE_TXN_LOG_HIERARCHY);
+        Logger.getLogger(FLOW_CONTROL_IGNORED_LOG_HIERARCHY);
         Logger.getLogger(DISCARDMSG_NOROUTE_LOG_HIERARCHY);
         Logger.getLogger(OPEN_TXN_LOG_HIERARCHY);
         Logger.getLogger(FLOW_REMOVED_LOG_HIERARCHY);
@@ -356,6 +359,33 @@ public class ChannelMessages
 
     /**
      * Log a Channel message of the Format:
+     * <pre>CHN-1012 : Flow Control Ignored. Channel will be closed.</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage FLOW_CONTROL_IGNORED()
+    {
+        String rawMessage = _messages.getString("FLOW_CONTROL_IGNORED");
+
+        final String message = rawMessage;
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return FLOW_CONTROL_IGNORED_LOG_HIERARCHY;
+            }
+        };
+    }
+
+    /**
+     * Log a Channel message of the Format:
      * <pre>CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties Fri Oct 17 15:53:42 2014
@@ -38,3 +38,5 @@ IDLE_TXN = CHN-1008 : Idle Transaction :
 DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
 DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
 DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
+
+FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Fri Oct 17 15:53:42 2014
@@ -51,6 +51,8 @@ public interface Broker<X extends Broker
     String MODEL_VERSION = "modelVersion";
     String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider";
 
+    String CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = "channel.flowControlEnforcementTimeout";
+
     String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit";
     String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
     String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
@@ -63,19 +65,22 @@ public interface Broker<X extends Broker
     String QPID_JMX_PORT  = "qpid.jmx_port";
 
     @ManagedContextDefault(name = "broker.name")
-    static final String DEFAULT_BROKER_NAME = "Broker";
+    String DEFAULT_BROKER_NAME = "Broker";
 
     @ManagedContextDefault(name = QPID_AMQP_PORT)
-    public static final String DEFAULT_AMQP_PORT_NUMBER = "5672";
+    String DEFAULT_AMQP_PORT_NUMBER = "5672";
     @ManagedContextDefault(name = QPID_HTTP_PORT)
-    public static final String DEFAULT_HTTP_PORT_NUMBER = "8080";
+    String DEFAULT_HTTP_PORT_NUMBER = "8080";
     @ManagedContextDefault(name = QPID_RMI_PORT)
-    public static final String DEFAULT_RMI_PORT_NUMBER  = "8999";
+    String DEFAULT_RMI_PORT_NUMBER  = "8999";
     @ManagedContextDefault(name = QPID_JMX_PORT)
-    public static final String DEFAULT_JMX_PORT_NUMBER  = "9099";
+    String DEFAULT_JMX_PORT_NUMBER  = "9099";
 
     @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
-    public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+    long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+
+    @ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)
+    long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l;
 
     String BROKER_FRAME_SIZE = "qpid.broker_frame_size";
     @ManagedContextDefault(name = BROKER_FRAME_SIZE)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 17 15:53:42 2014
@@ -36,7 +36,7 @@ import org.apache.qpid.server.util.Delet
  * Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
  * when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
  */
-public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<T>, Deletable<T>
+public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<AMQSessionModel>, Deletable<T>
 {
     public UUID getId();
 

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Fri Oct 17 15:53:42 2014
@@ -97,6 +97,7 @@ public class BrokerTestHelper
         when(broker.getEventLogger()).thenReturn(eventLogger);
         when(broker.getCategoryClass()).thenReturn(Broker.class);
         when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
+        when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
 
         when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
         when(systemConfig.getTaskExecutor()).thenReturn(TASK_EXECUTOR);

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Oct 17 15:53:42 2014
@@ -65,7 +65,7 @@ public class ServerConnection extends Co
                                                             LogSubject, AuthorizationHolder
 {
 
-    private final Broker _broker;
+    private final Broker<?> _broker;
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
 
@@ -106,6 +106,11 @@ public class ServerConnection extends Co
         return _reference;
     }
 
+    public Broker<?> getBroker()
+    {
+        return _broker;
+    }
+
     @Override
     protected void invoke(Method method)
     {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Oct 17 15:53:42 2014
@@ -62,6 +62,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
@@ -131,6 +132,8 @@ public class ServerSession extends Sessi
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private org.apache.qpid.server.model.Session<?> _modelObject;
+    private long _blockTime;
+    private long _blockingTimeout;
 
 
     public static interface MessageDispositionChangeListener
@@ -182,6 +185,9 @@ public class ServerSession extends Sessi
                 getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
             }
         }, getVirtualHost());
+
+        _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class,
+                                                                  Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
     }
 
     protected void setState(final State state)
@@ -774,6 +780,7 @@ public class ServerSession extends Sessi
                     {
                         invokeBlock();
                     }
+                    _blockTime = System.currentTimeMillis();
                     getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
                 }
 
@@ -798,7 +805,7 @@ public class ServerSession extends Sessi
         {
             if(_blocking.compareAndSet(true,false) && !isClosing())
             {
-
+                _blockTime = 0l;
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
                 MessageFlow mf = new MessageFlow();
                 mf.setUnit(MessageCreditUnit.MESSAGE);
@@ -812,6 +819,17 @@ public class ServerSession extends Sessi
         }
     }
 
+    boolean blockingTimeoutExceeded()
+    {
+        long blockTime = _blockTime;
+        boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+        if(b)
+        {
+            System.err.println(_blockingTimeout);
+        }
+        return b;
+    }
+
     @Override
     public Object getConnectionReference()
     {
@@ -1065,7 +1083,7 @@ public class ServerSession extends Sessi
     }
 
     @Override
-    public int compareTo(ServerSession o)
+    public int compareTo(AMQSessionModel o)
     {
         return getId().compareTo(o.getId());
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Oct 17 15:53:42 2014
@@ -34,11 +34,13 @@ import java.util.UUID;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.filter.AMQInvalidArgumentException;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
@@ -331,84 +333,103 @@ public class ServerSessionDelegate exten
     @Override
     public void messageTransfer(Session ssn, final MessageTransfer xfr)
     {
-        final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
-
-        final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
-        if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+        if(((ServerSession)ssn).blockingTimeoutExceeded())
         {
-            delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
-        }
+            getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
 
-        final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
-
-        final VirtualHostImpl virtualHost = getVirtualHost(ssn);
-        try
-        {
-            virtualHost.getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName(), virtualHost.getName());
+            ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE,
+                                        "Session flow control was requested, but not enforced by sender");
         }
-        catch (AccessControlException e)
+        else
         {
-            ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
-            exception(ssn, xfr, errorCode, e.getMessage());
+            final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
 
-            return;
-        }
+            final DeliveryProperties delvProps =
+                    xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+            if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+            {
+                delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+            }
 
-        final MessageStore store = virtualHost.getMessageStore();
-        final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
-        final ServerSession serverSession = (ServerSession) ssn;
-        final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
-        MessageReference<MessageTransferMessage> reference = message.newReference();
+            final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
 
-        final InstanceProperties instanceProperties = new InstanceProperties()
-        {
-            @Override
-            public Object getProperty(final Property prop)
+            final VirtualHostImpl virtualHost = getVirtualHost(ssn);
+            try
+            {
+                virtualHost.getSecurityManager()
+                        .authorisePublish(messageMetaData.isImmediate(),
+                                          messageMetaData.getRoutingKey(),
+                                          exchange.getName(),
+                                          virtualHost.getName());
+            }
+            catch (AccessControlException e)
             {
-                switch(prop)
+                ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
+                exception(ssn, xfr, errorCode, e.getMessage());
+
+                return;
+            }
+
+            final MessageStore store = virtualHost.getMessageStore();
+            final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
+            final ServerSession serverSession = (ServerSession) ssn;
+            final MessageTransferMessage message =
+                    new MessageTransferMessage(storeMessage, serverSession.getReference());
+            MessageReference<MessageTransferMessage> reference = message.newReference();
+
+            final InstanceProperties instanceProperties = new InstanceProperties()
+            {
+                @Override
+                public Object getProperty(final Property prop)
+                {
+                    switch (prop)
+                    {
+                        case EXPIRATION:
+                            return message.getExpiration();
+                        case IMMEDIATE:
+                            return message.isImmediate();
+                        case MANDATORY:
+                            return (delvProps == null || !delvProps.getDiscardUnroutable())
+                                   && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+                        case PERSISTENT:
+                            return message.isPersistent();
+                        case REDELIVERED:
+                            return delvProps.getRedelivered();
+                    }
+                    return null;
+                }
+            };
+
+            int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
+
+            if (enqueues == 0)
+            {
+                if ((delvProps == null || !delvProps.getDiscardUnroutable())
+                    && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+                {
+                    RangeSet rejects = RangeSetFactory.createRangeSet();
+                    rejects.add(xfr.getId());
+                    MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+                    ssn.invoke(reject);
+                }
+                else
                 {
-                    case EXPIRATION:
-                        return message.getExpiration();
-                    case IMMEDIATE:
-                        return message.isImmediate();
-                    case MANDATORY:
-                        return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
-                    case PERSISTENT:
-                        return message.isPersistent();
-                    case REDELIVERED:
-                        return delvProps.getRedelivered();
+                    virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
+                                                                                     messageMetaData.getRoutingKey()));
                 }
-                return null;
             }
-        };
-
-        int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
 
-        if(enqueues == 0)
-        {
-            if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+            if (serverSession.isTransactional())
             {
-                RangeSet rejects = RangeSetFactory.createRangeSet();
-                rejects.add(xfr.getId());
-                MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
-                ssn.invoke(reject);
+                serverSession.processed(xfr);
             }
             else
             {
-                virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
-                                                                                 messageMetaData.getRoutingKey()));
+                serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+                                           new CommandProcessedAction(serverSession, xfr));
             }
+            reference.release();
         }
-
-        if(serverSession.isTransactional())
-        {
-            serverSession.processed(xfr);
-        }
-        else
-        {
-            serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
-        }
-        reference.release();
     }
 
     private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Fri Oct 17 15:53:42 2014
@@ -18,14 +18,16 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.test.utils.QpidTestCase;
 import org.apache.qpid.transport.Binary;
 
-import static org.mockito.Mockito.mock;
-
 public class ServerSessionTest extends QpidTestCase
 {
 
@@ -59,6 +61,8 @@ public class ServerSessionTest extends Q
     public void testCompareTo() throws Exception
     {
         final Broker broker = mock(Broker.class);
+        when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
+
         ServerConnection connection = new ServerConnection(1, broker);
         connection.setVirtualHost(_virtualHost);
         ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Oct 17 15:53:42 2014
@@ -79,6 +79,7 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Consumer;
@@ -201,6 +202,8 @@ public class AMQChannel
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
+    private long _blockTime;
+    private long _blockingTimeout;
     private boolean _confirmOnPublish;
     private long _confirmedMessageCounter;
 
@@ -217,7 +220,8 @@ public class AMQChannel
         _logSubject = new ChannelLogSubject(this);
 
         _messageStore = messageStore;
-
+        _blockingTimeout = connection.getBroker().getContextValue(Long.class,
+                                                                  Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
         // by default the session is non-transactional
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
@@ -1317,7 +1321,7 @@ public class AMQChannel
     }
 
     @Override
-    public int compareTo(AMQChannel o)
+    public int compareTo(AMQSessionModel o)
     {
         return getId().compareTo(o.getId());
     }
@@ -1554,6 +1558,7 @@ public class AMQChannel
                 getVirtualHost().getEventLogger().message(_logSubject,
                                                           ChannelMessages.FLOW_ENFORCED("** All Queues **"));
                 flow(false);
+                _blockTime = System.currentTimeMillis();
             }
         }
     }
@@ -1580,6 +1585,8 @@ public class AMQChannel
             {
                 getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
                 flow(false);
+                _blockTime = System.currentTimeMillis();
+
             }
         }
     }
@@ -2146,44 +2153,61 @@ public class AMQChannel
                           " immediate: " + immediate + " ]");
         }
 
-        VirtualHostImpl vHost = _connection.getVirtualHost();
 
-        MessageDestination destination;
 
-        if (isDefaultExchange(exchangeName))
-        {
-            destination = vHost.getDefaultDestination();
-        }
-        else
-        {
-            destination = vHost.getMessageDestination(exchangeName.toString());
-        }
+        VirtualHostImpl vHost = _connection.getVirtualHost();
 
-        // if the exchange does not exist we raise a channel exception
-        if (destination == null)
+        if(blockingTimeoutExceeded())
         {
-            closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
+            getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
+            closeChannel(AMQConstant.MESSAGE_TOO_LARGE,
+                         "Channel flow control was requested, but not enforced by sender");
         }
         else
         {
+            MessageDestination destination;
 
-            MessagePublishInfo info = new MessagePublishInfo(exchangeName,
-                                                             immediate,
-                                                             mandatory,
-                                                             routingKey);
+            if (isDefaultExchange(exchangeName))
+            {
+                destination = vHost.getDefaultDestination();
+            }
+            else
+            {
+                destination = vHost.getMessageDestination(exchangeName.toString());
+            }
 
-            try
+            // if the exchange does not exist we raise a channel exception
+            if (destination == null)
             {
-                setPublishFrame(info, destination);
+                closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
             }
-            catch (AccessControlException e)
+            else
             {
-                _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
 
+                MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+                                                                 immediate,
+                                                                 mandatory,
+                                                                 routingKey);
+
+                try
+                {
+                    setPublishFrame(info, destination);
+                }
+                catch (AccessControlException e)
+                {
+                    _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+                }
             }
         }
     }
 
+    private boolean blockingTimeoutExceeded()
+    {
+
+        return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+    }
+
     @Override
     public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
     {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Oct 17 15:53:42 2014
@@ -722,7 +722,7 @@ public class Session_1_0 implements Sess
     }
 
     @Override
-    public int compareTo(Session_1_0 o)
+    public int compareTo(AMQSessionModel o)
     {
         return getId().compareTo(o.getId());
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org