You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by bizcenter <bi...@gmail.com> on 2013/06/25 12:53:31 UTC
Producer Window Size
In org.apache.activemq.broker.region.Queue#send(ProducerBrokerExchange,
Message), it just determine that the producerWindowSize is greater than 0,
so set producerWindowSize to 1024 or 10240 can do the same effect??? am i
right?
public void send(final ProducerBrokerExchange producerExchange, final
Message message) throws Exception {
final ConnectionContext context =
producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at
the
// destination.. it may have expired.
message.setRegionDestination(this);
ProducerState state = producerExchange.getProducerState();
if (state == null) {
LOG.warn("Send failed for: " + message + ", missing producer
state for: " + producerExchange);
throw new JMSException("Cannot send message to " +
getActiveMQDestination() + " with invalid (null) producer state");
}
final ProducerInfo producerInfo =
producerExchange.getProducerState().getInfo();
* final boolean sendProducerAck = !message.isResponseRequired() &&
producerInfo.getWindowSize() > 0*
&& !context.isInRecoveryMode();
if (message.isExpired()) {
// message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message, null);
if (sendProducerAck) {
ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
}
return;
}
if (memoryUsage.isFull()) {
isFull(context, memoryUsage);
fastProducer(context, producerInfo);
if (isProducerFlowControl() && context.isProducerFlowControl())
{
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
LOG
.info("Usage Manager Memory Limit ("
+ memoryUsage.getLimit()
+ ") reached on "
+
getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the
rate at which messages are removed from this destination to prevent flooding
it."
+ " See
http://activemq.apache.org/producer-flow-control.html for more info");
}
if (!context.isNetworkConnection() &&
systemUsage.isSendFailIfNoSpace()) {
throw new ResourceAllocationException("Usage Manager
Memory Limit reached. Stopping producer ("
+ message.getProducerId() + ") to prevent
flooding "
+ getActiveMQDestination().getQualifiedName() +
"."
+ " See
http://activemq.apache.org/producer-flow-control.html for more info");
}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
*if (producerInfo.getWindowSize() > 0 ||
message.isResponseRequired()) {*
// copy the exchange state since the context will be
// modified while we are waiting
// for space.
final ProducerBrokerExchange producerExchangeCopy =
producerExchange.copy();
synchronized (messagesWaitingForSpace) {
// Start flow control timeout task
// Prevent trying to start it multiple times
if (!flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.setName(getName()+"
Producer Flow Control Timeout Task");
flowControlTimeoutTask.start();
}
messagesWaitingForSpace.put(message.getMessageId(),
new Runnable() {
public void run() {
try {
// While waiting for space to free up...
the
// message may have expired.
if (message.isExpired()) {
LOG.error("expired waiting for
space..");
broker.messageExpired(context,
message, null);
destinationStatistics.getExpired().increment();
} else {
doMessageSend(producerExchangeCopy,
message);
}
if (sendProducerAck) {
ProducerAck ack = new
ProducerAck(producerInfo.getProducerId(), message
.getSize());
context.getConnection().dispatchAsync(ack);
} else {
Response response = new Response();
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
}
} catch (Exception e) {
if (!sendProducerAck &&
!context.isInRecoveryMode()) {
ExceptionResponse response = new
ExceptionResponse(e);
response.setCorrelationId(message.getCommandId());
context.getConnection().dispatchAsync(response);
} else {
LOG.debug("unexpected exception on
deferred send of :" + message, e);
}
}
}
});
if (!context.isNetworkConnection() &&
systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
flowControlTimeoutMessages.add(new
TimeoutMessage(message, context, systemUsage
.getSendFailIfNoSpaceAfterTimeout()));
}
registerCallbackForNotFullNotification();
context.setDontSendReponse(true);
return;
}
} else {
if (memoryUsage.isFull()) {
waitForSpace(context, memoryUsage, "Usage Manager
Memory Limit reached. Producer ("
+ message.getProducerId() + ") stopped to
prevent flooding "
+
getActiveMQDestination().getQualifiedName() + "."
+ " See
http://activemq.apache.org/producer-flow-control.html for more info");
}
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if (message.isExpired()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Expired message: " + message);
}
broker.getRoot().messageExpired(context, message,
null);
return;
}
}
}
}
doMessageSend(producerExchange, message);
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(),
message.getSize());
context.getConnection().dispatchAsync(ack);
}
}
--
View this message in context: http://activemq.2283324.n4.nabble.com/Producer-Window-Size-tp4668545.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.