You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/03/10 18:03:33 UTC
svn commit: r384853 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
./ transport/ transport/failover/ transport/fanout/ transport/mock/
transport/vm/
Author: rajdavies
Date: Fri Mar 10 09:03:31 2006
New Revision: 384853
URL: http://svn.apache.org/viewcvs?rev=384853&view=rev
Log:
Added request(Command. timeout) to transport and added a timeout on
close from ActiveMQConnection
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Fri Mar 10 09:03:31 2006
@@ -116,6 +116,7 @@
protected boolean asyncDispatch = true;
private boolean useAsyncSend = false;
private boolean useRetroactiveConsumer;
+ private int closeTimeout = 15000;
private long flowControlSleepTime = 0;
private final JMSConnectionStatsImpl stats;
@@ -541,7 +542,7 @@
if (isConnectionInfoSentToBroker) {
- syncSendPacket(info.createRemoveCommand());
+ syncSendPacket(info.createRemoveCommand(),closeTimeout);
}
asyncSendPacket(new ShutdownInfo());
@@ -735,6 +736,22 @@
}
/**
+ * @return Returns the closeTimeout.
+ */
+ public int getCloseTimeout(){
+ return closeTimeout;
+ }
+
+
+ /**
+ * @param closeTimeout The closeTimeout to set.
+ */
+ public void setCloseTimeout(int closeTimeout){
+ this.closeTimeout=closeTimeout;
+ }
+
+
+ /**
*
* @return Returns the onSendPrepareMessageBody.
*/
@@ -1064,6 +1081,41 @@
try {
Response response = this.transport.request(command);
+ if (response.isException()) {
+ ExceptionResponse er = (ExceptionResponse) response;
+ if (er.getException() instanceof JMSException)
+ throw (JMSException) er.getException();
+ else
+ throw JMSExceptionSupport.create(er.getException());
+ }
+ return response;
+ } catch (IOException e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+
+ /**
+ * Send a packet through a Connection - for internal use only
+ *
+ * @param command
+ * @return
+ * @throws JMSException
+ */
+ public Response syncSendPacket(Command command, int timeout) throws JMSException {
+ if (isClosed()) {
+ throw new ConnectionClosedException();
+ } else {
+
+ if (command.isMessage() && flowControlSleepTime > 0) {
+ try {
+ Thread.sleep(flowControlSleepTime);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ try {
+ Response response = this.transport.request(command,timeout);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
if (er.getException() instanceof JMSException)
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java Fri Mar 10 09:03:31 2006
@@ -81,6 +81,7 @@
private boolean objectMessageSerializationDefered = false;
protected boolean asyncDispatch = true;
private boolean useAsyncSend = false;
+ private int closeTimeout = 15000;
private boolean useRetroactiveConsumer;
JMSStatsImpl factoryStats = new JMSStatsImpl();
@@ -415,6 +416,7 @@
props.setProperty("useCompression", Boolean.toString(isUseCompression()));
props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("userName", getUserName());
+ props.setProperty("closeTimeout", Integer.toString(getCloseTimeout()));
}
public boolean isOnSendPrepareMessageBody() {
@@ -447,5 +449,19 @@
public void setAsyncDispatch(boolean asyncDispatch) {
this.asyncDispatch = asyncDispatch;
+ }
+
+ /**
+ * @return Returns the closeTimeout.
+ */
+ public int getCloseTimeout(){
+ return closeTimeout;
+ }
+
+ /**
+ * @param closeTimeout The closeTimeout to set.
+ */
+ public void setCloseTimeout(int closeTimeout){
+ this.closeTimeout=closeTimeout;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java Fri Mar 10 09:03:31 2006
@@ -19,6 +19,8 @@
import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
import org.apache.activemq.command.Response;
import org.apache.activemq.util.IOExceptionSupport;
@@ -49,6 +51,23 @@
} else {
throw IOExceptionSupport.create(target);
}
+ }
+ }
+
+ public synchronized Response getResult(int timeout) throws IOException {
+ try {
+ return (Response) super.get(timeout,TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted.");
+ } catch (ExecutionException e) {
+ Throwable target = e.getCause();
+ if( target instanceof IOException ) {
+ throw (IOException)target;
+ } else {
+ throw IOExceptionSupport.create(target);
+ }
+ }catch(TimeoutException e){
+ return null;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java Fri Mar 10 09:03:31 2006
@@ -51,6 +51,12 @@
}
}
+ public Response request(Command command,int timeout) throws IOException {
+ synchronized(writeMutex){
+ return next.request(command,timeout);
+ }
+ }
+
public String toString() {
return next.toString();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Fri Mar 10 09:03:31 2006
@@ -67,6 +67,11 @@
return response.getResult();
}
+ public Response request(Command command,int timeout) throws IOException {
+ FutureResponse response = asyncRequest(command);
+ return response.getResult(timeout);
+ }
+
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Mar 10 09:03:31 2006
@@ -32,30 +32,53 @@
/**
* A one way asynchronous send
+ * @param command
+ * @throws IOException
*/
public void oneway(Command command) throws IOException;
/**
* An asynchronous request response where the Receipt will be returned
* in the future
+ * @param command
+ * @return the FutureResponse
+ * @throws IOException
*/
public FutureResponse asyncRequest(Command command) throws IOException;
-
+
/**
* A synchronous request response
+ * @param command
+ * @return the response
+ * @throws IOException
*/
public Response request(Command command) throws IOException;
/**
+ * A synchronous request response
+ * @param command
+ * @param timeout
+ * @return the repsonse or null if timeout
+ * @throws IOException
+ */
+ public Response request(Command command, int timeout) throws IOException;
+
+ /**
* Returns the current transport listener
+ * @return
*/
public TransportListener getTransportListener();
/**
* Registers an inbound command listener
+ * @param commandListener
*/
public void setTransportListener(TransportListener commandListener);
+ /**
+ * @param target
+ * @return the target
+ */
public Object narrow(Class target);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Fri Mar 10 09:03:31 2006
@@ -93,6 +93,10 @@
public Response request(Command command) throws IOException {
return next.request(command);
}
+
+ public Response request(Command command,int timeout) throws IOException {
+ return next.request(command,timeout);
+ }
public void onException(IOException error) {
transportListener.onException(error);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Fri Mar 10 09:03:31 2006
@@ -73,6 +73,10 @@
public Response request(Command command) throws IOException {
throw new AssertionError("Unsupported Method");
}
+
+ public Response request(Command command,int timeout) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
/**
* Process the inbound command
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri Mar 10 09:03:31 2006
@@ -389,6 +389,10 @@
public Response request(Command command) throws IOException {
throw new AssertionError("Unsupported Method");
}
+
+ public Response request(Command command,int timeout) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
public void add(URI u[]) {
for (int i = 0; i < u.length; i++) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Fri Mar 10 09:03:31 2006
@@ -418,6 +418,10 @@
public Response request(Command command) throws IOException {
throw new AssertionError("Unsupported Method");
}
+
+ public Response request(Command command,int timeout) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
public void reconnect() {
log.debug("Waking up reconnect task");
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Fri Mar 10 09:03:31 2006
@@ -102,6 +102,10 @@
synchronized public Response request(Command command) throws IOException {
return next.request(command);
}
+
+ public Response request(Command command,int timeout) throws IOException {
+ return next.request(command, timeout);
+ }
synchronized public void onException(IOException error) {
transportListener.onException(error);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=384853&r1=384852&r2=384853&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Mar 10 09:03:31 2006
@@ -86,6 +86,10 @@
public Response request(Command command) throws IOException{
throw new AssertionError("Unsupported Method");
}
+
+ public Response request(Command command,int timeout) throws IOException {
+ throw new AssertionError("Unsupported Method");
+ }
public synchronized TransportListener getTransportListener() {
return transportListener;