You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/03/08 19:44:04 UTC

svn commit: r516139 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ActiveMQConnection.java ActiveMQConnectionFactory.java ActiveMQMessageProducer.java ActiveMQSession.java

Author: chirino
Date: Thu Mar  8 10:44:03 2007
New Revision: 516139

URL: http://svn.apache.org/viewvc?view=rev&rev=516139
Log:
Adding the client side bits needed to implement producer flow control using a window.  Currently disabled since 
the server side bits still need implementing.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Mar  8 10:44:03 2007
@@ -76,6 +76,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
@@ -156,6 +157,7 @@
 
     // Maps ConsumerIds to ActiveMQConsumer objects
     private final ConcurrentHashMap dispatchers = new ConcurrentHashMap();
+    private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
     private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
     private final SessionId connectionSessionId;
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@@ -168,6 +170,7 @@
     private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
     private BrokerInfo brokerInfo;
     private IOException firstFailureError;
+    private int producerWindowSize=ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
     
     // Assume that protocol is the latest.  Change to the actual protocol
     // version when a WireFormatInfo is received.
@@ -1515,6 +1518,14 @@
     Transport getTransport() {
         return transport;
     }
+    
+	public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
+		producers.put(producerId, producer);		
+	}
+	public void removeProducer(ProducerId producerId) {
+		producers.remove(producerId);		
+	}
+
 
     public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
         dispatchers.put(consumerId, dispatcher); 
@@ -1546,6 +1557,12 @@
                     }
                     dispatcher.dispatch(md);
                 }
+            } else if (command.getDataStructureType() == ProducerAck.DATA_STRUCTURE_TYPE ) {
+            	ProducerAck pa = (ProducerAck) command;
+            	ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
+            	if( producer!=null ) {
+            		producer.onProducerAck(pa);
+            	}
             } else if ( command.isBrokerInfo() ) {
                 this.brokerInfo = (BrokerInfo)command;
                 brokerInfoReceived.countDown();
@@ -2006,4 +2023,16 @@
 	public int getProtocolVersion() {
 		return protocolVersion.get();
 	}
+
+
+	public int getProducerWindowSize() {
+		return producerWindowSize;
+	}
+
+
+	public void setProducerWindowSize(int producerWindowSize) {
+		this.producerWindowSize = producerWindowSize;
+	}
+
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Thu Mar  8 10:44:03 2007
@@ -17,6 +17,24 @@
  */
 package org.apache.activemq;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.naming.Context;
+
+import org.apache.activemq.blob.BlobTransferPolicy;
 import org.apache.activemq.jndi.JNDIBaseStorable;
 import org.apache.activemq.management.JMSStatsImpl;
 import org.apache.activemq.management.StatsCapable;
@@ -28,23 +46,6 @@
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.activemq.util.URISupport.CompositeData;
-import org.apache.activemq.blob.BlobTransferPolicy;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.TopicConnection;
-import javax.jms.TopicConnectionFactory;
-import javax.naming.Context;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
 
 /**
  * A ConnectionFactory is an an Administered object, and is used for creating
@@ -60,6 +61,7 @@
     public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
     public static final String DEFAULT_USER = null;
     public static final String DEFAULT_PASSWORD = null;
+    public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
 
     private IdGenerator clientIdGenerator;
     private String clientIDPrefix;
@@ -90,6 +92,7 @@
     private boolean alwaysSyncSend;
     private boolean useSyncSend=false;
     private boolean watchTopicAdvisories=true;
+    private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE;
 
     static protected final Executor DEFAULT_CONNECTION_EXECUTOR = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
             public Thread newThread(Runnable run) {
@@ -263,7 +266,7 @@
             connection.setTransformer(getTransformer());
             connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
             connection.setWatchTopicAdvisories(watchTopicAdvisories);
-            
+            connection.setProducerWindowSize(producerWindowSize);
             transport.start();
 
             if( clientID !=null )
@@ -590,7 +593,7 @@
         props.setProperty("optimizeAcknowledge", Boolean.toString(isOptimizeAcknowledge()));
         props.setProperty("statsEnabled",Boolean.toString(isStatsEnabled()));
         props.setProperty("alwaysSyncSend",Boolean.toString(isAlwaysSyncSend()));
-
+        props.setProperty("producerWindowSize", Integer.toString(producerWindowSize));
     }
 
     public boolean isUseCompression() {
@@ -749,4 +752,12 @@
     public void setStatsEnabled(boolean statsEnabled){
         this.factoryStats.setEnabled(statsEnabled);
     }
+
+	synchronized public int getProducerWindowSize() {
+		return producerWindowSize;
+	}
+
+	synchronized public void setProducerWindowSize(int producerWindowSize) {
+		this.producerWindowSize = producerWindowSize;
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Thu Mar  8 10:44:03 2007
@@ -27,12 +27,16 @@
 import javax.jms.MessageProducer;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerAck;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.management.JMSProducerStatsImpl;
 import org.apache.activemq.management.StatsCapable;
 import org.apache.activemq.management.StatsImpl;
+import org.apache.activemq.memory.UsageManager;
+import org.apache.activemq.util.IntrospectionSupport;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -83,12 +87,25 @@
     private long defaultTimeToLive;
     private long startTime;
     private MessageTransformer transformer;
+    private UsageManager producerWindow;
 
     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination)
             throws JMSException {
         this.session = session;
         this.info = new ProducerInfo(producerId);
+        this.info.setWindowSize(session.connection.getProducerWindowSize());        
+        if (destination!=null && destination.getOptions() != null) {
+            HashMap options = new HashMap(destination.getOptions());
+            IntrospectionSupport.setProperties(this.info, options, "producer.");
+        }
         this.info.setDestination(destination);
+        
+        // Enable producer window flow control if protocol > 3 and the window size > 0
+        if( session.connection.getProtocolVersion()>=3 && this.info.getWindowSize()>0 ) {
+        	producerWindow = new UsageManager("Producer Window: "+producerId);
+        	producerWindow.setLimit(this.info.getWindowSize());
+        }
+        
         this.disableMessageID = false;
         this.disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
         this.defaultDeliveryMode = Message.DEFAULT_DELIVERY_MODE;
@@ -470,7 +487,21 @@
                 message = transformedMessage;
             }
         }
-        this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
+        
+        if( producerWindow!=null ) {
+        	try {
+				producerWindow.waitForSpace();
+			} catch (InterruptedException e) {
+				throw new JMSException("Send aborted due to thread interrupt.");
+			}
+        }
+        
+        int size = this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
+
+        if( producerWindow!=null ) {
+			producerWindow.increaseUsage(size);
+        }
+        
         stats.onMessage();            
     }
 
@@ -524,5 +555,11 @@
     public String toString() {
         return "ActiveMQMessageProducer { value=" +info.getProducerId()+" }";
     }
+
+	public void onProducerAck(ProducerAck pa) {
+		if( this.producerWindow!=null ) {
+			this.producerWindow.decreaseUsage(pa.getSize());
+		}
+	}
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=516139&r1=516138&r2=516139
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Thu Mar  8 10:44:03 2007
@@ -1478,6 +1478,7 @@
      */
     protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
         this.producers.add(producer);
+        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
     }
 
     /**
@@ -1488,6 +1489,7 @@
      * @throws JMSException
      */
     protected void removeProducer(ActiveMQMessageProducer producer) {
+        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
         this.producers.remove(producer);
     }
 
@@ -1546,7 +1548,7 @@
      *            message expiration.
      * @throws JMSException
      */
-    protected void send(ActiveMQMessageProducer producer,
+    protected int send(ActiveMQMessageProducer producer,
 	        ActiveMQDestination destination,Message message,int deliveryMode,
 	        int priority,long timeToLive) throws JMSException{
 		checkClosed();
@@ -1599,6 +1601,12 @@
             }else{
                 this.connection.syncSendPacket(msg);
             }
+
+			// Since we defer lots of the marshaling till we hit the wire, this might not 
+			// provide and accurate size.  We may change over to doing more aggressive marshaling,
+			// to get more accurate sizes.. this is more important once users start using producer window
+			// flow control.
+			return msg.getSize();
 		}
 	}