You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/08 19:44:04 UTC
svn commit: r516139 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
ActiveMQConnection.java ActiveMQConnectionFactory.java
ActiveMQMessageProducer.java ActiveMQSession.java
Author: chirino
Date: Thu Mar 8 10:44:03 2007
New Revision: 516139
URL: http://svn.apache.org/viewvc?view=rev&rev=516139
Log:
Adding the client side bits needed to implement producer flow control using a window. Currently disabled since
the server side bits still need implementing.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Mar 8 10:44:03 2007
@@ -76,6 +76,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
@@ -156,6 +157,7 @@
// Maps ConsumerIds to ActiveMQConsumer objects
private final ConcurrentHashMap dispatchers = new ConcurrentHashMap();
+ private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
private final SessionId connectionSessionId;
private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@@ -168,6 +170,7 @@
private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
private BrokerInfo brokerInfo;
private IOException firstFailureError;
+ private int producerWindowSize=ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
// Assume that protocol is the latest. Change to the actual protocol
// version when a WireFormatInfo is received.
@@ -1515,6 +1518,14 @@
Transport getTransport() {
return transport;
}
+
+ public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
+ producers.put(producerId, producer);
+ }
+ public void removeProducer(ProducerId producerId) {
+ producers.remove(producerId);
+ }
+
public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
dispatchers.put(consumerId, dispatcher);
@@ -1546,6 +1557,12 @@
}
dispatcher.dispatch(md);
}
+ } else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) {
+ ProducerAck pa = (ProducerAck) command;
+ ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
+ if( producer!=null ) {
+ producer.onProducerAck(pa);
+ }
} else if ( command.isBrokerInfo() ) {
this.brokerInfo = (BrokerInfo)command;
brokerInfoReceived.countDown();
@@ -2006,4 +2023,16 @@
public int getProtocolVersion() {
return protocolVersion.get();
}
+
+
+ public int getProducerWindowSize() {
+ return producerWindowSize;
+ }
+
+
+ public void setProducerWindowSize(int producerWindowSize) {
+ this.producerWindowSize = producerWindowSize;
+ }
+
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Thu Mar 8 10:44:03 2007
@@ -17,6 +17,24 @@
*/
package org.apache.activemq;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.naming.Context;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.jndi.JNDIBaseStorable;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable;
@@ -28,23 +46,6 @@
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
-import org.apache.activemq.blob.BlobTransferPolicy;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.naming.Context;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
/**
* A ConnectionFactory is an an Administered object, and is used for creating
@@ -60,6 +61,7 @@
public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
public static final String DEFAULT_USER = null;
public static final String DEFAULT_PASSWORD = null;
+ public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
private IdGenerator clientIdGenerator;
private String clientIDPrefix;
@@ -90,6 +92,7 @@
private boolean alwaysSyncSend;
private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
+ private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE;
static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
public Thread newThread(Runnable run) {
@@ -263,7 +266,7 @@
connection.setTransformer(getTransformer());
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
connection.setWatchTopicAdvisories(watchTopicAdvisories);
-
+ connection.setProducerWindowSize(producerWindowSize);
transport.start();
if( clientID !=null )
@@ -590,7 +593,7 @@
props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend()));
-
+ props.setProperty("producerWindowSize", Integer.toString(producerWindowSize));
}
public boolean isUseCompression() {
@@ -749,4 +752,12 @@
public void setStatsEnabled(boolean statsEnabled){
this.factoryStats.setEnabled(statsEnabled);
}
+
+ synchronized public int getProducerWindowSize() {
+ return producerWindowSize;
+ }
+
+ synchronized public void setProducerWindowSize(int producerWindowSize) {
+ this.producerWindowSize = producerWindowSize;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Thu Mar 8 10:44:03 2007
@@ -27,12 +27,16 @@
import javax.jms.MessageProducer;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.management.JMSProducerStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.util.IntrospectionSupport;
+import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -83,12 +87,25 @@
private long defaultTimeToLive;
private long startTime;
private MessageTransformer transformer;
+ private UsageManager producerWindow;
protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination)
throws JMSException {
this.session = session;
this.info = new ProducerInfo(producerId);
+ this.info.setWindowSize(session.connection.getProducerWindowSize());
+ if (destination!=null && destination.getOptions() != null) {
+ HashMap options = new HashMap(destination.getOptions());
+ IntrospectionSupport.setProperties(this.info, options, "producer.");
+ }
this.info.setDestination(destination);
+
+ // Enable producer window flow control if protocol > 3 and the window size > 0
+ if( session.connection.getProtocolVersion()>=3 && this.info.getWindowSize()>0 ) {
+ producerWindow = new UsageManager("Producer Window: "+producerId);
+ producerWindow.setLimit(this.info.getWindowSize());
+ }
+
this.disableMessageID = false;
this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
@@ -470,7 +487,21 @@
message = transformedMessage;
}
}
- this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
+
+ if( producerWindow!=null ) {
+ try {
+ producerWindow.waitForSpace();
+ } catch (InterruptedException e) {
+ throw new JMSException("Send aborted due to thread interrupt.");
+ }
+ }
+
+ int size = this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
+
+ if( producerWindow!=null ) {
+ producerWindow.increaseUsage(size);
+ }
+
stats.onMessage();
}
@@ -524,5 +555,11 @@
public String toString() {
return "ActiveMQMessageProducer { value=" +info.getProducerId()+" }";
}
+
+ public void onProducerAck(ProducerAck pa) {
+ if( this.producerWindow!=null ) {
+ this.producerWindow.decreaseUsage(pa.getSize());
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Mar 8 10:44:03 2007
@@ -1478,6 +1478,7 @@
*/
protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
this.producers.add(producer);
+ this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
}
/**
@@ -1488,6 +1489,7 @@
* @throws JMSException
*/
protected void removeProducer(ActiveMQMessageProducer producer) {
+ this.connection.removeProducer(producer.getProducerInfo().getProducerId());
this.producers.remove(producer);
}
@@ -1546,7 +1548,7 @@
* message expiration.
* @throws JMSException
*/
- protected void send(ActiveMQMessageProducer producer,
+ protected int send(ActiveMQMessageProducer producer,
ActiveMQDestination destination,Message message,int deliveryMode,
int priority,long timeToLive) throws JMSException{
checkClosed();
@@ -1599,6 +1601,12 @@
}else{
this.connection.syncSendPacket(msg);
}
+
+ // Since we defer lots of the marshaling till we hit the wire, this might not
+ // provide and accurate size. We may change over to doing more aggressive marshaling,
+ // to get more accurate sizes.. this is more important once users start using producer window
+ // flow control.
+ return msg.getSize();
}
}