You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/04/09 19:02:08 UTC
svn commit: r646437 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
Author: rajdavies
Date: Wed Apr 9 10:02:04 2008
New Revision: 646437
URL: http://svn.apache.org/viewvc?rev=646437&view=rev
Log:
Updated to support sendTimeouts - see https://issues.apache.org/activemq/browse/AMQ-1517
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Apr 9 10:02:04 2008
@@ -142,6 +142,7 @@
private int closeTimeout = 15000;
private boolean watchTopicAdvisories = true;
private long warnAboutUnstartedConnectionTimeout = 500L;
+ private int sendTimeout =0;
private final Transport transport;
private final IdGenerator clientIdGenerator;
@@ -1518,6 +1519,21 @@
public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
}
+
+ /**
+ * @return the sendTimeout
+ */
+ public int getSendTimeout() {
+ return sendTimeout;
+ }
+
+ /**
+ * @param sendTimeout the sendTimeout to set
+ */
+ public void setSendTimeout(int sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+
/**
* Returns the time this connection was created
@@ -2091,5 +2107,4 @@
protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
connectionAudit.rollbackDuplicate(dispatcher, message);
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Apr 9 10:02:04 2008
@@ -106,6 +106,7 @@
private boolean watchTopicAdvisories = true;
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
private long warnAboutUnstartedConnectionTimeout = 500L;
+ private int sendTimeout =0;
private TransportListener transportListener;
// /////////////////////////////////////////////
@@ -302,6 +303,7 @@
connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
connection.setProducerWindowSize(getProducerWindowSize());
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
+ connection.setSendTimeout(getSendTimeout());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
@@ -533,6 +535,21 @@
public MessageTransformer getTransformer() {
return transformer;
}
+
+ /**
+ * @return the sendTimeout
+ */
+ public int getSendTimeout() {
+ return sendTimeout;
+ }
+
+ /**
+ * @param sendTimeout the sendTimeout to set
+ */
+ public void setSendTimeout(int sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+
/**
* Sets the transformer used to transform messages before they are sent on
@@ -627,6 +644,7 @@
props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
+ props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
}
public boolean isUseCompression() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Wed Apr 9 10:02:04 2008
@@ -79,7 +79,7 @@
private MessageTransformer transformer;
private MemoryUsage producerWindow;
- protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException {
+ protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
super(session);
this.info = new ProducerInfo(producerId);
this.info.setWindowSize(session.connection.getProducerWindowSize());
@@ -104,6 +104,7 @@
this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
this.session.addProducer(this);
this.session.asyncSendPacket(info);
+ this.setSendTimeout(sendTimeout);
setTransformer(session.getTransformer());
}
@@ -223,7 +224,7 @@
}
}
- this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow);
+ this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
stats.onMessage();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java Wed Apr 9 10:02:04 2008
@@ -35,6 +35,7 @@
protected int defaultDeliveryMode;
protected int defaultPriority;
protected long defaultTimeToLive;
+ protected int sendTimeout=0;
public ActiveMQMessageProducerSupport(ActiveMQSession session) {
this.session = session;
@@ -305,4 +306,18 @@
protected abstract void checkClosed() throws IllegalStateException;
+
+ /**
+ * @return the sendTimeout
+ */
+ public int getSendTimeout() {
+ return sendTimeout;
+ }
+
+ /**
+ * @param sendTimeout the sendTimeout to set
+ */
+ public void setSendTimeout(int sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java Wed Apr 9 10:02:04 2008
@@ -72,9 +72,9 @@
public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
- protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination)
+ protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination,int sendTimeout)
throws JMSException {
- super(session, session.getNextProducerId(), destination);
+ super(session, session.getNextProducerId(), destination,sendTimeout);
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Apr 9 10:02:04 2008
@@ -812,8 +812,8 @@
CustomDestination customDestination = (CustomDestination)destination;
return customDestination.createProducer(this);
}
-
- return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination));
+ int timeSendOut = connection.getSendTimeout();
+ return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
}
/**
@@ -1293,8 +1293,8 @@
CustomDestination customDestination = (CustomDestination)queue;
return customDestination.createSender(this);
}
-
- return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
+ int timeSendOut = connection.getSendTimeout();
+ return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
}
/**
@@ -1390,7 +1390,8 @@
CustomDestination customDestination = (CustomDestination)topic;
return customDestination.createPublisher(this);
}
- return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
+ int timeSendOut = connection.getSendTimeout();
+ return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
}
/**
@@ -1576,7 +1577,7 @@
* @throws JMSException
*/
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
- MemoryUsage producerWindow) throws JMSException {
+ MemoryUsage producerWindow, int sendTimeout) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
@@ -1623,7 +1624,7 @@
if (this.debug) {
LOG.debug(getSessionId() + " sending message: " + msg);
}
- if (!msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
+ if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
@@ -1637,7 +1638,11 @@
producerWindow.increaseUsage(size);
}
} else {
- this.connection.syncSendPacket(msg);
+ if (sendTimeout > 0) {
+ this.connection.syncSendPacket(msg,sendTimeout);
+ }else {
+ this.connection.syncSendPacket(msg);
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java Wed Apr 9 10:02:04 2008
@@ -85,8 +85,8 @@
TopicPublisher {
protected ActiveMQTopicPublisher(ActiveMQSession session,
- ActiveMQDestination destination) throws JMSException {
- super(session, session.getNextProducerId(), destination);
+ ActiveMQDestination destination, int sendTimeout) throws JMSException {
+ super(session, session.getNextProducerId(), destination,sendTimeout);
}
/**