You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/10 18:03:33 UTC

svn commit: r384853 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: ./ transport/ transport/failover/ transport/fanout/ transport/mock/ transport/vm/

Author: rajdavies
Date: Fri Mar 10 09:03:31 2006
New Revision: 384853

URL: http://svn.apache.org/viewcvs?rev=384853&view=rev
Log:
Added request(Command. timeout) to transport and added a timeout on
close from ActiveMQConnection

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Mar 10 09:03:31 2006
@@ -116,6 +116,7 @@
     protected boolean asyncDispatch = true;
     private boolean useAsyncSend = false;
     private boolean useRetroactiveConsumer;
+    private int closeTimeout = 15000;
     
     private long flowControlSleepTime = 0;
     private final JMSConnectionStatsImpl stats;
@@ -541,7 +542,7 @@
                 
                 
                 if (isConnectionInfoSentToBroker) {
-                    syncSendPacket(info.createRemoveCommand());
+                    syncSendPacket(info.createRemoveCommand(),closeTimeout);
                 }
 
                 asyncSendPacket(new ShutdownInfo());
@@ -735,6 +736,22 @@
     }
 
     /**
+     * @return Returns the closeTimeout.
+     */
+    public int getCloseTimeout(){
+        return closeTimeout;
+    }
+
+
+    /**
+     * @param closeTimeout The closeTimeout to set.
+     */
+    public void setCloseTimeout(int closeTimeout){
+        this.closeTimeout=closeTimeout;
+    }
+
+
+    /**
      * 
      * @return Returns the onSendPrepareMessageBody.
      */
@@ -1064,6 +1081,41 @@
 
             try {
                 Response response = this.transport.request(command);
+                if (response.isException()) {
+                    ExceptionResponse er = (ExceptionResponse) response;
+                    if (er.getException() instanceof JMSException)
+                        throw (JMSException) er.getException();
+                    else
+                        throw JMSExceptionSupport.create(er.getException());
+                }
+                return response;
+            } catch (IOException e) {
+                throw JMSExceptionSupport.create(e);
+            }
+        }
+    }
+    
+    /**
+     * Send a packet through a Connection - for internal use only
+     * 
+     * @param command
+     * @return
+     * @throws JMSException
+     */
+    public Response syncSendPacket(Command command, int timeout) throws JMSException {
+        if (isClosed()) {
+            throw new ConnectionClosedException();
+        } else {
+
+            if (command.isMessage() && flowControlSleepTime > 0) {
+                try {
+                    Thread.sleep(flowControlSleepTime);
+                } catch (InterruptedException e) {
+                }
+            }
+
+            try {
+                Response response = this.transport.request(command,timeout);
                 if (response.isException()) {
                     ExceptionResponse er = (ExceptionResponse) response;
                     if (er.getException() instanceof JMSException)

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Fri Mar 10 09:03:31 2006
@@ -81,6 +81,7 @@
     private boolean objectMessageSerializationDefered = false;
     protected boolean asyncDispatch = true;
     private boolean useAsyncSend = false;
+    private int closeTimeout = 15000;
     private boolean useRetroactiveConsumer;
 
     JMSStatsImpl factoryStats = new JMSStatsImpl();
@@ -415,6 +416,7 @@
         props.setProperty("useCompression", Boolean.toString(isUseCompression()));
         props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
         props.setProperty("userName", getUserName());
+        props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
     }
 
     public boolean isOnSendPrepareMessageBody() {
@@ -447,5 +449,19 @@
 
     public void setAsyncDispatch(boolean asyncDispatch) {
         this.asyncDispatch = asyncDispatch;
+    }
+
+    /**
+     * @return Returns the closeTimeout.
+     */
+    public int getCloseTimeout(){
+        return closeTimeout;
+    }
+
+    /**
+     * @param closeTimeout The closeTimeout to set.
+     */
+    public void setCloseTimeout(int closeTimeout){
+        this.closeTimeout=closeTimeout;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java Fri Mar 10 09:03:31 2006
@@ -19,6 +19,8 @@
 import edu.emory.mathcs.backport.java.util.concurrent.Callable;
 import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
 import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
 
 import org.apache.activemq.command.Response;
 import org.apache.activemq.util.IOExceptionSupport;
@@ -49,6 +51,23 @@
             } else {
                 throw IOExceptionSupport.create(target);
             }
+        }
+    }
+    
+    public synchronized Response getResult(int timeout) throws IOException {
+        try {
+            return (Response) super.get(timeout,TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException("Interrupted.");
+        } catch (ExecutionException e) {
+            Throwable target = e.getCause();
+            if( target instanceof IOException ) {
+                throw (IOException)target;
+            } else {
+                throw IOExceptionSupport.create(target);
+            }
+        }catch(TimeoutException e){
+            return null;
         }
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java Fri Mar 10 09:03:31 2006
@@ -51,6 +51,12 @@
         }
     }
     
+    public Response request(Command command,int timeout) throws IOException {
+        synchronized(writeMutex){
+            return next.request(command,timeout);
+        }
+    }
+    
     public String toString() {
         return next.toString();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Fri Mar 10 09:03:31 2006
@@ -67,6 +67,11 @@
         return response.getResult();
     }
     
+    public Response request(Command command,int timeout) throws IOException {
+        FutureResponse response = asyncRequest(command);
+        return response.getResult(timeout);
+    }
+    
     public void onCommand(Command command) {
         boolean debug = log.isDebugEnabled();
         if( command.isResponse() ) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Mar 10 09:03:31 2006
@@ -32,30 +32,53 @@
 
     /**
      * A one way asynchronous send
+     * @param command 
+     * @throws IOException 
      */
     public void oneway(Command command) throws IOException;
 
     /**
      * An asynchronous request response where the Receipt will be returned
      * in the future
+     * @param command 
+     * @return the FutureResponse
+     * @throws IOException 
      */
     public FutureResponse asyncRequest(Command command) throws IOException;
-
+    
     /**
      * A synchronous request response
+     * @param command 
+     * @return the response
+     * @throws IOException 
      */
     public Response request(Command command) throws IOException;
 
     /**
+     * A synchronous request response
+     * @param command 
+     * @param timeout 
+     * @return the repsonse or null if timeout
+     * @throws IOException 
+     */
+    public Response request(Command command, int timeout) throws IOException;
+
+    /**
      * Returns the current transport listener
+     * @return 
      */
     public TransportListener getTransportListener();
 
     /**
      * Registers an inbound command listener
+     * @param commandListener 
      */
     public void setTransportListener(TransportListener commandListener);
     
+    /**
+     * @param target
+     * @return the target
+     */
     public Object narrow(Class target);
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Fri Mar 10 09:03:31 2006
@@ -93,6 +93,10 @@
     public Response request(Command command) throws IOException {
         return next.request(command);
     }
+    
+    public Response request(Command command,int timeout) throws IOException {
+        return next.request(command,timeout);
+    }
 
     public void onException(IOException error) {
         transportListener.onException(error);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Fri Mar 10 09:03:31 2006
@@ -73,6 +73,10 @@
     public Response request(Command command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
+    
+    public Response request(Command command,int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
 
     /**
      * Process the inbound command

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri Mar 10 09:03:31 2006
@@ -389,6 +389,10 @@
     public Response request(Command command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
+    
+    public Response request(Command command,int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
 
     public void add(URI u[]) {
         for (int i = 0; i < u.length; i++) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Fri Mar 10 09:03:31 2006
@@ -418,6 +418,10 @@
     public Response request(Command command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
+    
+    public Response request(Command command,int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
 
     public void reconnect() {
         log.debug("Waking up reconnect task");

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Fri Mar 10 09:03:31 2006
@@ -102,6 +102,10 @@
     synchronized public Response request(Command command) throws IOException {
         return next.request(command);
     }
+    
+    public Response request(Command command,int timeout) throws IOException {
+        return next.request(command, timeout);
+    }
 
     synchronized public void onException(IOException error) {
         transportListener.onException(error);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Mar 10 09:03:31 2006
@@ -86,6 +86,10 @@
     public Response request(Command command) throws IOException{
         throw new AssertionError("Unsupported Method");
     }
+    
+    public Response request(Command command,int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
 
     public synchronized TransportListener getTransportListener() {
         return transportListener;