You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/04/09 19:02:08 UTC

svn commit: r646437 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/

Author: rajdavies
Date: Wed Apr  9 10:02:04 2008
New Revision: 646437

URL: http://svn.apache.org/viewvc?rev=646437&view=rev
Log:
Updated to support sendTimeouts - see https://issues.apache.org/activemq/browse/AMQ-1517

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Wed Apr  9 10:02:04 2008
@@ -142,6 +142,7 @@
     private int closeTimeout = 15000;
     private boolean watchTopicAdvisories = true;
     private long warnAboutUnstartedConnectionTimeout = 500L;
+    private int sendTimeout =0;
 
     private final Transport transport;
     private final IdGenerator clientIdGenerator;
@@ -1518,6 +1519,21 @@
     public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
         this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
     }
+    
+    /**
+     * @return the sendTimeout
+     */
+    public int getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * @param sendTimeout the sendTimeout to set
+     */
+    public void setSendTimeout(int sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
 
     /**
      * Returns the time this connection was created
@@ -2091,5 +2107,4 @@
     protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
         connectionAudit.rollbackDuplicate(dispatcher, message);
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Wed Apr  9 10:02:04 2008
@@ -106,6 +106,7 @@
     private boolean watchTopicAdvisories = true;
     private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
     private long warnAboutUnstartedConnectionTimeout = 500L;
+    private int sendTimeout =0;
     private TransportListener transportListener;
 
     // /////////////////////////////////////////////
@@ -302,6 +303,7 @@
         connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
         connection.setProducerWindowSize(getProducerWindowSize());
         connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
+        connection.setSendTimeout(getSendTimeout());
         if (transportListener != null) {
             connection.addTransportListener(transportListener);
         }
@@ -533,6 +535,21 @@
     public MessageTransformer getTransformer() {
         return transformer;
     }
+    
+    /**
+     * @return the sendTimeout
+     */
+    public int getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * @param sendTimeout the sendTimeout to set
+     */
+    public void setSendTimeout(int sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
+
 
     /**
      * Sets the transformer used to transform messages before they are sent on
@@ -627,6 +644,7 @@
         props.setProperty("statsEnabled", Boolean.toString(isStatsEnabled()));
         props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
         props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
+        props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
     }
 
     public boolean isUseCompression() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java Wed Apr  9 10:02:04 2008
@@ -79,7 +79,7 @@
     private MessageTransformer transformer;
     private MemoryUsage producerWindow;
 
-    protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException {
+    protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout) throws JMSException {
         super(session);
         this.info = new ProducerInfo(producerId);
         this.info.setWindowSize(session.connection.getProducerWindowSize());
@@ -104,6 +104,7 @@
         this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
         this.session.addProducer(this);
         this.session.asyncSendPacket(info);
+        this.setSendTimeout(sendTimeout);
         setTransformer(session.getTransformer());
     }
 
@@ -223,7 +224,7 @@
             }
         }
 
-        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow);
+        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow,sendTimeout);
 
         stats.onMessage();
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java Wed Apr  9 10:02:04 2008
@@ -35,6 +35,7 @@
     protected int defaultDeliveryMode;
     protected int defaultPriority;
     protected long defaultTimeToLive;
+    protected int sendTimeout=0;
 
     public ActiveMQMessageProducerSupport(ActiveMQSession session) {
         this.session = session;
@@ -305,4 +306,18 @@
 
 
     protected abstract void checkClosed() throws IllegalStateException;
+
+    /**
+     * @return the sendTimeout
+     */
+    public int getSendTimeout() {
+        return sendTimeout;
+    }
+
+    /**
+     * @param sendTimeout the sendTimeout to set
+     */
+    public void setSendTimeout(int sendTimeout) {
+        this.sendTimeout = sendTimeout;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQQueueSender.java Wed Apr  9 10:02:04 2008
@@ -72,9 +72,9 @@
 
 public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
 
-    protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination)
+    protected ActiveMQQueueSender(ActiveMQSession session, ActiveMQDestination destination,int sendTimeout)
         throws JMSException {
-        super(session, session.getNextProducerId(), destination);
+        super(session, session.getNextProducerId(), destination,sendTimeout);
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java Wed Apr  9 10:02:04 2008
@@ -812,8 +812,8 @@
             CustomDestination customDestination = (CustomDestination)destination;
             return customDestination.createProducer(this);
         }
-
-        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination));
+        int timeSendOut = connection.getSendTimeout();
+        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
     }
 
     /**
@@ -1293,8 +1293,8 @@
             CustomDestination customDestination = (CustomDestination)queue;
             return customDestination.createSender(this);
         }
-
-        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
+        int timeSendOut = connection.getSendTimeout();
+        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
     }
 
     /**
@@ -1390,7 +1390,8 @@
             CustomDestination customDestination = (CustomDestination)topic;
             return customDestination.createPublisher(this);
         }
-        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
+        int timeSendOut = connection.getSendTimeout();
+        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
     }
 
     /**
@@ -1576,7 +1577,7 @@
      * @throws JMSException
      */
     protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
-                        MemoryUsage producerWindow) throws JMSException {
+                        MemoryUsage producerWindow, int sendTimeout) throws JMSException {
 
         checkClosed();
         if (destination.isTemporary() && connection.isDeleted(destination)) {
@@ -1623,7 +1624,7 @@
             if (this.debug) {
                 LOG.debug(getSessionId() + " sending message: " + msg);
             }
-            if (!msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
+            if (sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                 this.connection.asyncSendPacket(msg);
                 if (producerWindow != null) {
                     // Since we defer lots of the marshaling till we hit the
@@ -1637,7 +1638,11 @@
                     producerWindow.increaseUsage(size);
                 }
             } else {
-                this.connection.syncSendPacket(msg);
+                if (sendTimeout > 0) {
+                    this.connection.syncSendPacket(msg,sendTimeout);
+                }else {
+                    this.connection.syncSendPacket(msg);
+                }
             }
 
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java?rev=646437&r1=646436&r2=646437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQTopicPublisher.java Wed Apr  9 10:02:04 2008
@@ -85,8 +85,8 @@
         TopicPublisher {
 
     protected ActiveMQTopicPublisher(ActiveMQSession session,
-                                     ActiveMQDestination destination) throws JMSException {
-        super(session, session.getNextProducerId(), destination);
+                                     ActiveMQDestination destination, int sendTimeout) throws JMSException {
+        super(session, session.getNextProducerId(), destination,sendTimeout);
     }
 
     /**