You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 17:53:42 UTC
svn commit: r1632618 - in /qpid/trunk/qpid/java: broker-core/
broker-core/src/main/java/org/apache/qpid/server/logging/messages/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-c...
Author: rgodfrey
Date: Fri Oct 17 15:53:42 2014
New Revision: 1632618
URL: http://svn.apache.org/r1632618
Log:
QPID-6163 : [Java Broker] Disconnect clients which do not obey flow control
Modified:
qpid/trunk/qpid/java/broker-core/build-generate-sources.xml
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/trunk/qpid/java/broker-core/build-generate-sources.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/build-generate-sources.xml?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/build-generate-sources.xml (original)
+++ qpid/trunk/qpid/java/broker-core/build-generate-sources.xml Fri Oct 17 15:53:42 2014
@@ -79,7 +79,7 @@
<echo message="logmessages is ${logmessages}"/>
- <java classname="org.apache.qpid.server.logging.GenerateLogMessages" fork="true" dir="${gentools.classes}" failonerror="true">
+ <java classname="org.apache.qpid.server.logging.GenerateLogMessages" fork="true" failonerror="true">
<arg line="'${logmessages}'"/>
<arg value="-j"/>
<arg value="-o"/>
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java Fri Oct 17 15:53:42 2014
@@ -22,14 +22,15 @@ package org.apache.qpid.server.logging.m
import static org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.logging.LogMessage;
-
import java.text.MessageFormat;
import java.util.Locale;
import java.util.ResourceBundle;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.logging.LogMessage;
+
/**
* DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
*
@@ -53,6 +54,7 @@ public class ChannelMessages
public static final String DEADLETTERMSG_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg";
public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
public static final String IDLE_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
+ public static final String FLOW_CONTROL_IGNORED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_control_ignored";
public static final String DISCARDMSG_NOROUTE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noroute";
public static final String OPEN_TXN_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.open_txn";
public static final String FLOW_REMOVED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "channel.flow_removed";
@@ -69,6 +71,7 @@ public class ChannelMessages
Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY);
Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
Logger.getLogger(IDLE_TXN_LOG_HIERARCHY);
+ Logger.getLogger(FLOW_CONTROL_IGNORED_LOG_HIERARCHY);
Logger.getLogger(DISCARDMSG_NOROUTE_LOG_HIERARCHY);
Logger.getLogger(OPEN_TXN_LOG_HIERARCHY);
Logger.getLogger(FLOW_REMOVED_LOG_HIERARCHY);
@@ -356,6 +359,33 @@ public class ChannelMessages
/**
* Log a Channel message of the Format:
+ * <pre>CHN-1012 : Flow Control Ignored. Channel will be closed.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage FLOW_CONTROL_IGNORED()
+ {
+ String rawMessage = _messages.getString("FLOW_CONTROL_IGNORED");
+
+ final String message = rawMessage;
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return FLOW_CONTROL_IGNORED_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Channel message of the Format:
* <pre>CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties Fri Oct 17 15:53:42 2014
@@ -38,3 +38,5 @@ IDLE_TXN = CHN-1008 : Idle Transaction :
DISCARDMSG_NOALTEXCH = CHN-1009 : Discarded message : {0,number} as no alternate exchange configured for queue : {1} routing key : {2}
DISCARDMSG_NOROUTE = CHN-1010 : Discarded message : {0,number} as no binding on alternate exchange : {1}
DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : {1}
+
+FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Fri Oct 17 15:53:42 2014
@@ -51,6 +51,8 @@ public interface Broker<X extends Broker
String MODEL_VERSION = "modelVersion";
String CONFIDENTIAL_CONFIGURATION_ENCRYPTION_PROVIDER = "confidentialConfigurationEncryptionProvider";
+ String CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = "channel.flowControlEnforcementTimeout";
+
String CONNECTION_SESSION_COUNT_LIMIT = "connection.sessionCountLimit";
String CONNECTION_HEART_BEAT_DELAY = "connection.heartBeatDelay";
String CONNECTION_CLOSE_WHEN_NO_ROUTE = "connection.closeWhenNoRoute";
@@ -63,19 +65,22 @@ public interface Broker<X extends Broker
String QPID_JMX_PORT = "qpid.jmx_port";
@ManagedContextDefault(name = "broker.name")
- static final String DEFAULT_BROKER_NAME = "Broker";
+ String DEFAULT_BROKER_NAME = "Broker";
@ManagedContextDefault(name = QPID_AMQP_PORT)
- public static final String DEFAULT_AMQP_PORT_NUMBER = "5672";
+ String DEFAULT_AMQP_PORT_NUMBER = "5672";
@ManagedContextDefault(name = QPID_HTTP_PORT)
- public static final String DEFAULT_HTTP_PORT_NUMBER = "8080";
+ String DEFAULT_HTTP_PORT_NUMBER = "8080";
@ManagedContextDefault(name = QPID_RMI_PORT)
- public static final String DEFAULT_RMI_PORT_NUMBER = "8999";
+ String DEFAULT_RMI_PORT_NUMBER = "8999";
@ManagedContextDefault(name = QPID_JMX_PORT)
- public static final String DEFAULT_JMX_PORT_NUMBER = "9099";
+ String DEFAULT_JMX_PORT_NUMBER = "9099";
@ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
- public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+ long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
+
+ @ManagedContextDefault(name = CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT)
+ long DEFAULT_CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT = 5000l;
String BROKER_FRAME_SIZE = "qpid.broker_frame_size";
@ManagedContextDefault(name = BROKER_FRAME_SIZE)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Oct 17 15:53:42 2014
@@ -36,7 +36,7 @@ import org.apache.qpid.server.util.Delet
* Extends {@link Comparable} to allow objects to be inserted into a {@link ConcurrentSkipListSet}
* when monitoring the blocking and blocking of queues/sessions in {@link AMQQueue}.
*/
-public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<T>, Deletable<T>
+public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQConnectionModel<C,T>> extends Comparable<AMQSessionModel>, Deletable<T>
{
public UUID getId();
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java Fri Oct 17 15:53:42 2014
@@ -97,6 +97,7 @@ public class BrokerTestHelper
when(broker.getEventLogger()).thenReturn(eventLogger);
when(broker.getCategoryClass()).thenReturn(Broker.class);
when(broker.getParent(SystemConfig.class)).thenReturn(systemConfig);
+ when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
when(systemConfig.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java Fri Oct 17 15:53:42 2014
@@ -65,7 +65,7 @@ public class ServerConnection extends Co
LogSubject, AuthorizationHolder
{
- private final Broker _broker;
+ private final Broker<?> _broker;
private Runnable _onOpenTask;
private AtomicBoolean _logClosed = new AtomicBoolean(false);
@@ -106,6 +106,11 @@ public class ServerConnection extends Co
return _reference;
}
+ public Broker<?> getBroker()
+ {
+ return _broker;
+ }
+
@Override
protected void invoke(Method method)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Oct 17 15:53:42 2014
@@ -62,6 +62,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -131,6 +132,8 @@ public class ServerSession extends Sessi
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private org.apache.qpid.server.model.Session<?> _modelObject;
+ private long _blockTime;
+ private long _blockingTimeout;
public static interface MessageDispositionChangeListener
@@ -182,6 +185,9 @@ public class ServerSession extends Sessi
getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
}, getVirtualHost());
+
+ _blockingTimeout = ((ServerConnection)connection).getBroker().getContextValue(Long.class,
+ Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
}
protected void setState(final State state)
@@ -774,6 +780,7 @@ public class ServerSession extends Sessi
{
invokeBlock();
}
+ _blockTime = System.currentTimeMillis();
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
}
@@ -798,7 +805,7 @@ public class ServerSession extends Sessi
{
if(_blocking.compareAndSet(true,false) && !isClosing())
{
-
+ _blockTime = 0l;
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_REMOVED());
MessageFlow mf = new MessageFlow();
mf.setUnit(MessageCreditUnit.MESSAGE);
@@ -812,6 +819,17 @@ public class ServerSession extends Sessi
}
}
+ boolean blockingTimeoutExceeded()
+ {
+ long blockTime = _blockTime;
+ boolean b = _blocking.get() && blockTime != 0 && (System.currentTimeMillis() - blockTime) > _blockingTimeout;
+ if(b)
+ {
+ System.err.println(_blockingTimeout);
+ }
+ return b;
+ }
+
@Override
public Object getConnectionReference()
{
@@ -1065,7 +1083,7 @@ public class ServerSession extends Sessi
}
@Override
- public int compareTo(ServerSession o)
+ public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Oct 17 15:53:42 2014
@@ -34,11 +34,13 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
@@ -331,84 +333,103 @@ public class ServerSessionDelegate exten
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
-
- final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
- if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ if(((ServerSession)ssn).blockingTimeoutExceeded())
{
- delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
- }
+ getVirtualHost(ssn).getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
- final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
-
- final VirtualHostImpl virtualHost = getVirtualHost(ssn);
- try
- {
- virtualHost.getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName(), virtualHost.getName());
+ ((ServerSession) ssn).close(AMQConstant.MESSAGE_TOO_LARGE,
+ "Session flow control was requested, but not enforced by sender");
}
- catch (AccessControlException e)
+ else
{
- ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
- exception(ssn, xfr, errorCode, e.getMessage());
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
- return;
- }
+ final DeliveryProperties delvProps =
+ xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
+ if (delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+ {
+ delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
+ }
- final MessageStore store = virtualHost.getMessageStore();
- final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
- final ServerSession serverSession = (ServerSession) ssn;
- final MessageTransferMessage message = new MessageTransferMessage(storeMessage, serverSession.getReference());
- MessageReference<MessageTransferMessage> reference = message.newReference();
+ final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
- final InstanceProperties instanceProperties = new InstanceProperties()
- {
- @Override
- public Object getProperty(final Property prop)
+ final VirtualHostImpl virtualHost = getVirtualHost(ssn);
+ try
+ {
+ virtualHost.getSecurityManager()
+ .authorisePublish(messageMetaData.isImmediate(),
+ messageMetaData.getRoutingKey(),
+ exchange.getName(),
+ virtualHost.getName());
+ }
+ catch (AccessControlException e)
{
- switch(prop)
+ ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
+ exception(ssn, xfr, errorCode, e.getMessage());
+
+ return;
+ }
+
+ final MessageStore store = virtualHost.getMessageStore();
+ final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
+ final ServerSession serverSession = (ServerSession) ssn;
+ final MessageTransferMessage message =
+ new MessageTransferMessage(storeMessage, serverSession.getReference());
+ MessageReference<MessageTransferMessage> reference = message.newReference();
+
+ final InstanceProperties instanceProperties = new InstanceProperties()
+ {
+ @Override
+ public Object getProperty(final Property prop)
+ {
+ switch (prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
+ }
+ };
+
+ int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
+
+ if (enqueues == 0)
+ {
+ if ((delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = RangeSetFactory.createRangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
{
- case EXPIRATION:
- return message.getExpiration();
- case IMMEDIATE:
- return message.isImmediate();
- case MANDATORY:
- return (delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
- case PERSISTENT:
- return message.isPersistent();
- case REDELIVERED:
- return delvProps.getRedelivered();
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
+ messageMetaData.getRoutingKey()));
}
- return null;
}
- };
-
- int enqueues = serverSession.enqueue(message, instanceProperties, exchange);
- if(enqueues == 0)
- {
- if((delvProps == null || !delvProps.getDiscardUnroutable()) && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ if (serverSession.isTransactional())
{
- RangeSet rejects = RangeSetFactory.createRangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
+ serverSession.processed(xfr);
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(exchange.getName(),
- messageMetaData.getRoutingKey()));
+ serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, xfr));
}
+ reference.release();
}
-
- if(serverSession.isTransactional())
- {
- serverSession.processed(xfr);
- }
- else
- {
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
- }
- reference.release();
}
private StoredMessage<MessageMetaData_0_10> createStoreMessage(final MessageTransfer xfr,
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/ServerSessionTest.java Fri Oct 17 15:53:42 2014
@@ -18,14 +18,16 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.Binary;
-import static org.mockito.Mockito.mock;
-
public class ServerSessionTest extends QpidTestCase
{
@@ -59,6 +61,8 @@ public class ServerSessionTest extends Q
public void testCompareTo() throws Exception
{
final Broker broker = mock(Broker.class);
+ when(broker.getContextValue(eq(Long.class), eq(Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT))).thenReturn(0l);
+
ServerConnection connection = new ServerConnection(1, broker);
connection.setVirtualHost(_virtualHost);
ServerSession session1 = new ServerSession(connection, new ServerSessionDelegate(),
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Oct 17 15:53:42 2014
@@ -79,6 +79,7 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
@@ -201,6 +202,8 @@ public class AMQChannel
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private long _blockTime;
+ private long _blockingTimeout;
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
@@ -217,7 +220,8 @@ public class AMQChannel
_logSubject = new ChannelLogSubject(this);
_messageStore = messageStore;
-
+ _blockingTimeout = connection.getBroker().getContextValue(Long.class,
+ Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
@@ -1317,7 +1321,7 @@ public class AMQChannel
}
@Override
- public int compareTo(AMQChannel o)
+ public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
@@ -1554,6 +1558,7 @@ public class AMQChannel
getVirtualHost().getEventLogger().message(_logSubject,
ChannelMessages.FLOW_ENFORCED("** All Queues **"));
flow(false);
+ _blockTime = System.currentTimeMillis();
}
}
}
@@ -1580,6 +1585,8 @@ public class AMQChannel
{
getVirtualHost().getEventLogger().message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getName()));
flow(false);
+ _blockTime = System.currentTimeMillis();
+
}
}
}
@@ -2146,44 +2153,61 @@ public class AMQChannel
" immediate: " + immediate + " ]");
}
- VirtualHostImpl vHost = _connection.getVirtualHost();
- MessageDestination destination;
- if (isDefaultExchange(exchangeName))
- {
- destination = vHost.getDefaultDestination();
- }
- else
- {
- destination = vHost.getMessageDestination(exchangeName.toString());
- }
+ VirtualHostImpl vHost = _connection.getVirtualHost();
- // if the exchange does not exist we raise a channel exception
- if (destination == null)
+ if(blockingTimeoutExceeded())
{
- closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
+ getVirtualHost().getEventLogger().message(ChannelMessages.FLOW_CONTROL_IGNORED());
+ closeChannel(AMQConstant.MESSAGE_TOO_LARGE,
+ "Channel flow control was requested, but not enforced by sender");
}
else
{
+ MessageDestination destination;
- MessagePublishInfo info = new MessagePublishInfo(exchangeName,
- immediate,
- mandatory,
- routingKey);
+ if (isDefaultExchange(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
- try
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
{
- setPublishFrame(info, destination);
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
}
- catch (AccessControlException e)
+ else
{
- _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+ immediate,
+ mandatory,
+ routingKey);
+
+ try
+ {
+ setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
}
}
}
+ private boolean blockingTimeoutExceeded()
+ {
+
+ return _blocking.get() && (System.currentTimeMillis() - _blockTime) > _blockingTimeout;
+ }
+
@Override
public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1632618&r1=1632617&r2=1632618&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Oct 17 15:53:42 2014
@@ -722,7 +722,7 @@ public class Session_1_0 implements Sess
}
@Override
- public int compareTo(Session_1_0 o)
+ public int compareTo(AMQSessionModel o)
{
return getId().compareTo(o.getId());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org