You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/07/04 11:54:40 UTC
svn commit: r418966 - in /incubator/activemq/trunk/activemq-core: ./
src/main/java/org/apache/activemq/ src/test/java/org/apache/activemq/jndi/
Author: jstrachan
Date: Tue Jul 4 02:54:38 2006
New Revision: 418966
URL: http://svn.apache.org/viewvc?rev=418966&view=rev
Log:
fix for AMQ-792 to allow the async dispatch of messages to consumers to be easily configured & properly documented the javadoc. For more detail see http://activemq.org/site/consumer-dispatch-async.html
Modified:
incubator/activemq/trunk/activemq-core/ (props changed)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java
Propchange: incubator/activemq/trunk/activemq-core/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Tue Jul 4 02:54:38 2006
@@ -9,7 +9,6 @@
bin
junit*.properties
*.iml
-test
+ActiveMQConnections.dot
load.db
sdbStoreTest.db
-eclipse-classes
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Jul 4 02:54:38 2006
@@ -122,7 +122,7 @@
private boolean copyMessageOnSend = true;
private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false;
- protected boolean asyncDispatch = false;
+ protected boolean dispatchAsync = false;
protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = false;
@@ -274,7 +274,7 @@
||acknowledgeMode==Session.CLIENT_ACKNOWLEDGE;
return new ActiveMQSession(this,getNextSessionId(),(transacted?Session.SESSION_TRANSACTED
:(acknowledgeMode==Session.SESSION_TRANSACTED?Session.AUTO_ACKNOWLEDGE:acknowledgeMode)),
- asyncDispatch,alwaysSessionAsync);
+ dispatchAsync,alwaysSessionAsync);
}
/**
@@ -674,7 +674,7 @@
info.setSubcriptionName(subscriptionName);
info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages);
- info.setDispatchAsync(asyncDispatch);
+ info.setDispatchAsync(dispatchAsync);
// Allows the options on the destination to configure the consumerInfo
if( info.getDestination().getOptions()!=null ) {
@@ -727,8 +727,9 @@
}
/**
- * @param prefetchPolicy
- * The prefetchPolicy to set.
+ * Sets the <a
+ * href="http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html">prefetch
+ * policy</a> for consumers created by this connection.
*/
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
@@ -1031,7 +1032,7 @@
info.setSelector(messageSelector);
info.setPrefetchSize(maxMessages);
info.setNoLocal(noLocal);
- info.setDispatchAsync(asyncDispatch);
+ info.setDispatchAsync(dispatchAsync);
// Allows the options on the destination to configure the consumerInfo
if( info.getDestination().getOptions()!=null ) {
@@ -1358,10 +1359,16 @@
/**
- * @param alwaysSessionAsync The alwaysSessionAsync to set.
+ * If this flag is set then a separate thread is not used for dispatching
+ * messages for each Session in the Connection. However, a separate thread
+ * is always used if there is more than one session, or the session isn't in
+ * auto acknowledge or duplicates ok mode
+ *
+ * @param alwaysSessionAsync
+ * The alwaysSessionAsync to set.
*/
- public void setAlwaysSessionAsync(boolean alwaysSessionAsync){
- this.alwaysSessionAsync=alwaysSessionAsync;
+ public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
+ this.alwaysSessionAsync = alwaysSessionAsync;
}
/**
@@ -1603,12 +1610,28 @@
}
- public boolean isAsyncDispatch() {
- return asyncDispatch;
+ public boolean isDispatchAsync() {
+ return dispatchAsync;
}
- public void setAsyncDispatch(boolean asyncDispatch) {
- this.asyncDispatch = asyncDispatch;
+ /**
+ * Enables or disables the default setting of whether or not consumers have
+ * their messages <a
+ * href="http://incubator.apache.org/activemq/consumer-dispatch-async.html">dispatched
+ * synchronously or asynchronously by the broker</a>.
+ *
+ * For non-durable topics for example we typically dispatch synchronously by
+ * default to minimize context switches which boost performance. However
+ * sometimes its better to go slower to ensure that a single blocked
+ * consumer socket does not block delivery to other consumers.
+ *
+ * @param asyncDispatch
+ * If true then consumers created on this connection will default
+ * to having their messages dispatched asynchronously. The
+ * default value is false.
+ */
+ public void setDispatchAsync(boolean asyncDispatch) {
+ this.dispatchAsync = asyncDispatch;
}
public boolean isObjectMessageSerializationDefered() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Tue Jul 4 02:54:38 2006
@@ -74,7 +74,7 @@
private boolean copyMessageOnSend = true;
private boolean useCompression = false;
private boolean objectMessageSerializationDefered = false;
- protected boolean asyncDispatch = false;
+ protected boolean dispatchAsync = false;
protected boolean alwaysSessionAsync=true;
private boolean useAsyncSend = false;
private boolean optimizeAcknowledge = false;
@@ -227,7 +227,7 @@
connection.setCopyMessageOnSend(isCopyMessageOnSend());
connection.setUseCompression(isUseCompression());
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
- connection.setAsyncDispatch(isAsyncDispatch());
+ connection.setDispatchAsync(isDispatchAsync());
connection.setUseAsyncSend(isUseAsyncSend());
connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
@@ -337,6 +337,11 @@
return prefetchPolicy;
}
+ /**
+ * Sets the <a
+ * href="http://incubator.apache.org/activemq/what-is-the-prefetch-limit-for.html">prefetch
+ * policy</a> for consumers created by this connection.
+ */
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
@@ -419,7 +424,7 @@
}
public void populateProperties(Properties props) {
- props.setProperty("asyncDispatch", Boolean.toString(isAsyncDispatch()));
+ props.setProperty("dispatchAsync", Boolean.toString(isDispatchAsync()));
if (getBrokerURL() != null) {
props.setProperty(Context.PROVIDER_URL, getBrokerURL());
@@ -472,12 +477,28 @@
this.objectMessageSerializationDefered = objectMessageSerializationDefered;
}
- public boolean isAsyncDispatch() {
- return asyncDispatch;
+ public boolean isDispatchAsync() {
+ return dispatchAsync;
}
- public void setAsyncDispatch(boolean asyncDispatch) {
- this.asyncDispatch = asyncDispatch;
+ /**
+ * Enables or disables the default setting of whether or not consumers have
+ * their messages <a
+ * href="http://incubator.apache.org/activemq/consumer-dispatch-async.html">dispatched
+ * synchronously or asynchronously by the broker</a>.
+ *
+ * For non-durable topics for example we typically dispatch synchronously by
+ * default to minimize context switches which boost performance. However
+ * sometimes its better to go slower to ensure that a single blocked
+ * consumer socket does not block delivery to other consumers.
+ *
+ * @param asyncDispatch
+ * If true then consumers created on this connection will default
+ * to having their messages dispatched asynchronously. The
+ * default value is false.
+ */
+ public void setDispatchAsync(boolean asyncDispatch) {
+ this.dispatchAsync = asyncDispatch;
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java Tue Jul 4 02:54:38 2006
@@ -75,6 +75,6 @@
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
- return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, asyncDispatch);
+ return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, dispatchAsync);
}
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java?rev=418966&r1=418965&r2=418966&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/jndi/ObjectFactoryTest.java Tue Jul 4 02:54:38 2006
@@ -27,7 +27,7 @@
public void testConnectionFactory() throws Exception {
// Create sample connection factory
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
- factory.setAsyncDispatch(false);
+ factory.setDispatchAsync(false);
factory.setBrokerURL("vm://test");
factory.setClientID("test");
factory.setCopyMessageOnSend(false);
@@ -53,7 +53,7 @@
temp = (ActiveMQConnectionFactory)refFactory.getObjectInstance(ref, null, null, null);
// Check settings
- assertEquals(factory.isAsyncDispatch(), temp.isAsyncDispatch());
+ assertEquals(factory.isDispatchAsync(), temp.isDispatchAsync());
assertEquals(factory.getBrokerURL(), temp.getBrokerURL());
assertEquals(factory.getClientID(), temp.getClientID());
assertEquals(factory.isCopyMessageOnSend(), temp.isCopyMessageOnSend());