You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/07/04 11:54:40 UTC

svn commit: r418966 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/ src/test/java/org/apache/activemq/jndi/

Author: jstrachan
Date: Tue Jul  4 02:54:38 2006
New Revision: 418966

URL: http://svn.apache.org/viewvc?rev=418966&view=rev
Log:
fix for AMQ-792 to allow the async dispatch of messages to consumers to be easily configured & properly documented the javadoc. For more detail see http://activemq.org/site/consumer-dispatch-async.html

Modified:
    incubator/activemq/trunk/activemq-core/   (props changed)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java

Propchange: incubator/activemq/trunk/activemq-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Jul  4 02:54:38 2006
@@ -9,7 +9,6 @@
 bin
 junit*.properties
 *.iml
-test
+ActiveMQConnections.dot
 load.db
 sdbStoreTest.db
-eclipse-classes

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jul  4 02:54:38 2006
@@ -122,7 +122,7 @@
     private boolean copyMessageOnSend = true;
     private boolean useCompression = false;
     private boolean objectMessageSerializationDefered = false;
-    protected boolean asyncDispatch = false;
+    protected boolean dispatchAsync = false;
     protected boolean alwaysSessionAsync=true;
     private boolean useAsyncSend = false;
     private boolean optimizeAcknowledge = false;
@@ -274,7 +274,7 @@
                         ||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
         return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
                         :(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
-                        asyncDispatch,alwaysSessionAsync);
+                        dispatchAsync,alwaysSessionAsync);
     }
 
     /**
@@ -674,7 +674,7 @@
         info.setSubcriptionName(subscriptionName);
         info.setSelector(messageSelector);
         info.setPrefetchSize(maxMessages);
-        info.setDispatchAsync(asyncDispatch);
+        info.setDispatchAsync(dispatchAsync);
 
         // Allows the options on the destination to configure the consumerInfo
         if( info.getDestination().getOptions()!=null ) {
@@ -727,8 +727,9 @@
     }
 
     /**
-     * @param prefetchPolicy
-     *            The prefetchPolicy to set.
+     * Sets the <a
+     * href="http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html">prefetch
+     * policy</a> for consumers created by this connection.
      */
     public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
         this.prefetchPolicy = prefetchPolicy;
@@ -1031,7 +1032,7 @@
         info.setSelector(messageSelector);
         info.setPrefetchSize(maxMessages);
         info.setNoLocal(noLocal);
-        info.setDispatchAsync(asyncDispatch);
+        info.setDispatchAsync(dispatchAsync);
         
         // Allows the options on the destination to configure the consumerInfo
         if( info.getDestination().getOptions()!=null ) {
@@ -1358,10 +1359,16 @@
 
 
     /**
-     * @param alwaysSessionAsync The alwaysSessionAsync to set.
+     * If this flag is set then a separate thread is not used for dispatching
+     * messages for each Session in the Connection. However, a separate thread
+     * is always used if there is more than one session, or the session isn't in
+     * auto acknowledge or duplicates ok mode
+     * 
+     * @param alwaysSessionAsync
+     *            The alwaysSessionAsync to set.
      */
-    public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
-        this.alwaysSessionAsync=alwaysSessionAsync;
+    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
+        this.alwaysSessionAsync = alwaysSessionAsync;
     }
 
     /**
@@ -1603,12 +1610,28 @@
 
     }
 
-    public boolean isAsyncDispatch() {
-        return asyncDispatch;
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
     }
 
-    public void setAsyncDispatch(boolean asyncDispatch) {
-        this.asyncDispatch = asyncDispatch;
+    /**
+     * Enables or disables the default setting of whether or not consumers have
+     * their messages <a
+     * href="http://incubator.apache.org/activemq/consumer-dispatch-async.html">dispatched
+     * synchronously or asynchronously by the broker</a>.
+     * 
+     * For non-durable topics for example we typically dispatch synchronously by
+     * default to minimize context switches which boost performance. However
+     * sometimes its better to go slower to ensure that a single blocked
+     * consumer socket does not block delivery to other consumers.
+     * 
+     * @param asyncDispatch
+     *            If true then consumers created on this connection will default
+     *            to having their messages dispatched asynchronously. The
+     *            default value is false.
+     */
+    public void setDispatchAsync(boolean asyncDispatch) {
+        this.dispatchAsync = asyncDispatch;
     }
 
     public boolean isObjectMessageSerializationDefered() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Jul  4 02:54:38 2006
@@ -74,7 +74,7 @@
     private boolean copyMessageOnSend = true;
     private boolean useCompression = false;
     private boolean objectMessageSerializationDefered = false;
-    protected boolean asyncDispatch = false;
+    protected boolean dispatchAsync = false;
     protected boolean alwaysSessionAsync=true;
     private boolean useAsyncSend = false;
     private boolean optimizeAcknowledge = false;
@@ -227,7 +227,7 @@
             connection.setCopyMessageOnSend(isCopyMessageOnSend());
             connection.setUseCompression(isUseCompression());
             connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
-            connection.setAsyncDispatch(isAsyncDispatch());
+            connection.setDispatchAsync(isDispatchAsync());
             connection.setUseAsyncSend(isUseAsyncSend());
             connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
             connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
@@ -337,6 +337,11 @@
         return prefetchPolicy;
     }
 
+    /**
+     * Sets the <a
+     * href="http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html">prefetch
+     * policy</a> for consumers created by this connection.
+     */
     public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
         this.prefetchPolicy = prefetchPolicy;
     }
@@ -419,7 +424,7 @@
     }
 
     public void populateProperties(Properties props) {
-        props.setProperty("asyncDispatch", Boolean.toString(isAsyncDispatch()));
+        props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
 
         if (getBrokerURL() != null) {
             props.setProperty(Context.PROVIDER_URL, getBrokerURL());
@@ -472,12 +477,28 @@
         this.objectMessageSerializationDefered = objectMessageSerializationDefered;
     }
 
-    public boolean isAsyncDispatch() {
-        return asyncDispatch;
+    public boolean isDispatchAsync() {
+        return dispatchAsync;
     }
 
-    public void setAsyncDispatch(boolean asyncDispatch) {
-        this.asyncDispatch = asyncDispatch;
+    /**
+     * Enables or disables the default setting of whether or not consumers have
+     * their messages <a
+     * href="http://incubator.apache.org/activemq/consumer-dispatch-async.html">dispatched
+     * synchronously or asynchronously by the broker</a>.
+     * 
+     * For non-durable topics for example we typically dispatch synchronously by
+     * default to minimize context switches which boost performance. However
+     * sometimes its better to go slower to ensure that a single blocked
+     * consumer socket does not block delivery to other consumers.
+     * 
+     * @param asyncDispatch
+     *            If true then consumers created on this connection will default
+     *            to having their messages dispatched asynchronously. The
+     *            default value is false.
+     */
+    public void setDispatchAsync(boolean asyncDispatch) {
+        this.dispatchAsync = asyncDispatch;
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java Tue Jul  4 02:54:38 2006
@@ -75,6 +75,6 @@
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
         ensureConnectionInfoSent();
-        return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, asyncDispatch);
+        return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, dispatchAsync);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java Tue Jul  4 02:54:38 2006
@@ -27,7 +27,7 @@
     public void testConnectionFactory() throws Exception {
         // Create sample connection factory
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
-        factory.setAsyncDispatch(false);
+        factory.setDispatchAsync(false);
         factory.setBrokerURL("vm://test");
         factory.setClientID("test");
         factory.setCopyMessageOnSend(false);
@@ -53,7 +53,7 @@
         temp = (ActiveMQConnectionFactory)refFactory.getObjectInstance(ref, null, null, null);
 
         // Check settings
-        assertEquals(factory.isAsyncDispatch(), temp.isAsyncDispatch());
+        assertEquals(factory.isDispatchAsync(), temp.isDispatchAsync());
         assertEquals(factory.getBrokerURL(), temp.getBrokerURL());
         assertEquals(factory.getClientID(), temp.getClientID());
         assertEquals(factory.isCopyMessageOnSend(), temp.isCopyMessageOnSend());