You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/07 15:32:21 UTC
svn commit: r392288 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/
main/java/org/apache/activemq/transport/
main/java/org/apache/activemq/transport/activeio/
main/java/org/apache/activemq/transport/failover/ ma...
Author: chirino
Date: Fri Apr 7 06:31:59 2006
New Revision: 392288
URL: http://svn.apache.org/viewcvs?rev=392288&view=rev
Log:
Updated the Transport interface so that you can pass in a ResponseCallback object that will be called when the response for a request arrives.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr 7 06:31:59 2006
@@ -45,6 +45,8 @@
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
@@ -376,26 +378,47 @@
try{
if(command.isMessageDispatch()){
waitStarted();
- MessageDispatch md=(MessageDispatch) command;
+ final MessageDispatch md=(MessageDispatch) command;
DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
if(sub!=null){
Message message= configureMessage(md);
if(trace)
log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
- if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){
+
+
+ if( !message.isResponseRequired() ) {
+
+ // If the message was originally sent using async send, we will preserve that QOS
+ // by bridging it using an async send (small chance of message loss).
remoteBroker.oneway(message);
- }else{
- Response response=remoteBroker.request(message);
- if(response.isException()){
- ExceptionResponse er=(ExceptionResponse) response;
- serviceLocalException(er.getException());
- }
+ localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+ } else {
+
+ // The message was not sent using async send, so we should only ack the local
+ // broker when we get confirmation that the remote broker has received the message.
+ ResponseCallback callback = new ResponseCallback() {
+ public void onCompletion(FutureResponse future) {
+ try {
+ Response response = future.getResult();
+ if(response.isException()){
+ ExceptionResponse er=(ExceptionResponse) response;
+ serviceLocalException(er.getException());
+ } else {
+ localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+ }
+ } catch (IOException e) {
+ serviceLocalException(e);
+ }
+ }
+ };
+
+ remoteBroker.asyncRequest(message, callback);
}
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
- localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
-
+
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.
// int dispatched = sub.incrementDispatched();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Fri Apr 7 06:31:59 2006
@@ -24,13 +24,17 @@
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
@@ -199,14 +203,14 @@
}
}
- protected void serviceLocalException(IOException error) {
+ protected void serviceLocalException(Throwable error) {
System.out.println("Unexpected local exception: "+error);
error.printStackTrace();
}
protected void serviceLocalCommand(Command command) {
try {
if( command.isMessageDispatch() ) {
- MessageDispatch md = (MessageDispatch) command;
+ final MessageDispatch md = (MessageDispatch) command;
Message message = md.getMessage();
message.setProducerId(producerInfo.getProducerId());
message.setDestination( md.getDestination() );
@@ -216,11 +220,40 @@
message.setTransactionId(null);
message.evictMarshlledForm();
- remoteBroker.oneway( message );
+ if( !message.isResponseRequired() ) {
+
+ // If the message was originally sent using async send, we will preserve that QOS
+ // by bridging it using an async send (small chance of message loss).
+ remoteBroker.oneway(message);
+ localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+ } else {
+
+ // The message was not sent using async send, so we should only ack the local
+ // broker when we get confirmation that the remote broker has received the message.
+ ResponseCallback callback = new ResponseCallback() {
+ public void onCompletion(FutureResponse future) {
+ try {
+ Response response = future.getResult();
+ if(response.isException()){
+ ExceptionResponse er=(ExceptionResponse) response;
+ serviceLocalException(er.getException());
+ } else {
+ localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+ }
+ } catch (IOException e) {
+ serviceLocalException(e);
+ }
+ }
+ };
+
+ remoteBroker.asyncRequest(message, callback);
+ }
+
+
// Ack on every message since we don't know if the broker is blocked due to memory
// usage and is waiting for an Ack to un-block him.
- localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
// Acking a range is more efficient, but also more prone to locking up a server
// Perhaps doing something like the following should be policy based.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java Fri Apr 7 06:31:59 2006
@@ -16,62 +16,47 @@
*/
package org.apache.activemq.transport;
-import edu.emory.mathcs.backport.java.util.concurrent.Callable;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
-import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
import org.apache.activemq.command.Response;
-import org.apache.activemq.util.IOExceptionSupport;
-import java.io.IOException;
-import java.io.InterruptedIOException;
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-public class FutureResponse extends FutureTask {
+public class FutureResponse {
+
+ private final ResponseCallback responseCallback;
+ private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
- private static final Callable EMPTY_CALLABLE = new Callable() {
- public Object call() throws Exception {
- return null;
- }};
-
- public FutureResponse() {
- super(EMPTY_CALLABLE);
+ public FutureResponse(ResponseCallback responseCallback) {
+ this.responseCallback = responseCallback;
}
- public synchronized Response getResult() throws IOException {
+ public Response getResult() throws IOException {
try {
- return (Response) super.get();
+ return (Response) responseSlot.take();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
- } catch (ExecutionException e) {
- Throwable target = e.getCause();
- if( target instanceof IOException ) {
- throw (IOException)target;
- } else {
- throw IOExceptionSupport.create(target);
- }
}
}
- public synchronized Response getResult(int timeout) throws IOException {
+ public Response getResult(int timeout) throws IOException {
try {
- return (Response) super.get(timeout,TimeUnit.MILLISECONDS);
+ return (Response) responseSlot.poll(timeout,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted.");
- } catch (ExecutionException e) {
- Throwable target = e.getCause();
- if( target instanceof IOException ) {
- throw (IOException)target;
- } else {
- throw IOExceptionSupport.create(target);
- }
- }catch(TimeoutException e){
- return null;
}
}
- public synchronized void set(Object result) {
- super.set(result);
+ public void set(Response result) throws InterruptedIOException {
+ try {
+ responseSlot.put(result);
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted.");
+ }
+ if( responseCallback !=null ) {
+ responseCallback.onCompletion(this);
+ }
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java Fri Apr 7 06:31:59 2006
@@ -33,9 +33,9 @@
super(next);
}
- public FutureResponse asyncRequest(Command command) throws IOException {
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
synchronized(writeMutex) {
- return next.asyncRequest(command);
+ return next.asyncRequest(command, null);
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java?rev=392288&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java Fri Apr 7 06:31:59 2006
@@ -0,0 +1,24 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+/**
+ * @version $Revision$
+ */
+public interface ResponseCallback {
+ void onCompletion(FutureResponse resp);
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Fri Apr 7 06:31:59 2006
@@ -17,6 +17,7 @@
package org.apache.activemq.transport;
import java.io.IOException;
+import java.io.InterruptedIOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
@@ -55,34 +56,38 @@
next.oneway(command);
}
- public FutureResponse asyncRequest(Command command) throws IOException {
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
- FutureResponse future = new FutureResponse();
+ FutureResponse future = new FutureResponse(responseCallback);
requestMap.put(new Integer(command.getCommandId()), future);
next.oneway(command);
return future;
}
public Response request(Command command) throws IOException {
- FutureResponse response = asyncRequest(command);
+ FutureResponse response = asyncRequest(command, null);
return response.getResult();
}
public Response request(Command command,int timeout) throws IOException {
- FutureResponse response = asyncRequest(command);
+ FutureResponse response = asyncRequest(command, null);
return response.getResult(timeout);
}
public void onCommand(Command command) {
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
- Response response = (Response) command;
- FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
- if( future!=null ) {
- future.set(response);
- } else {
- if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
+ try {
+ Response response = (Response) command;
+ FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
+ if( future!=null ) {
+ future.set(response);
+ } else {
+ if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
+ }
+ } catch (InterruptedIOException e) {
+ onException(e);
}
} else {
getTransportListener().onCommand(command);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Apr 7 06:31:59 2006
@@ -39,12 +39,15 @@
/**
* An asynchronous request response where the Receipt will be returned
- * in the future
+ * in the future. If responseCallback is not null, then it will be called
+ * when the response has been completed.
+ *
* @param command
+ * @param responseCallback TODO
* @return the FutureResponse
* @throws IOException
*/
- public FutureResponse asyncRequest(Command command) throws IOException;
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException;
/**
* A synchronous request response
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Fri Apr 7 06:31:59 2006
@@ -86,8 +86,8 @@
next.oneway(command);
}
- public FutureResponse asyncRequest(Command command) throws IOException {
- return next.asyncRequest(command);
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
+ return next.asyncRequest(command, null);
}
public Response request(Command command) throws IOException {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Fri Apr 7 06:31:59 2006
@@ -66,7 +66,7 @@
return null;
}
- public FutureResponse asyncRequest(Command command) throws IOException {
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java Fri Apr 7 06:31:59 2006
@@ -27,6 +27,7 @@
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
@@ -96,7 +97,7 @@
commandChannel.writeCommand(command);
}
- public FutureResponse asyncRequest(Command command) throws IOException {
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri Apr 7 06:31:59 2006
@@ -32,6 +32,7 @@
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@@ -387,7 +388,7 @@
}
}
- public FutureResponse asyncRequest(Command command) throws IOException {
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Fri Apr 7 06:31:59 2006
@@ -33,6 +33,7 @@
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@@ -411,7 +412,7 @@
return true;
}
- public FutureResponse asyncRequest(Command command) throws IOException {
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Fri Apr 7 06:31:59 2006
@@ -22,6 +22,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
@@ -95,8 +96,8 @@
next.oneway(command);
}
- synchronized public FutureResponse asyncRequest(Command command) throws IOException {
- return next.asyncRequest(command);
+ synchronized public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
+ return next.asyncRequest(command, null);
}
synchronized public Response request(Command command) throws IOException {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Fri Apr 7 06:31:59 2006
@@ -78,7 +78,7 @@
}
public Response request(Command command) throws IOException {
- FutureResponse response = asyncRequest(command);
+ FutureResponse response = asyncRequest(command, null);
while (true) {
Response result = response.getResult(requestTimeout);
if (result != null) {
@@ -89,7 +89,7 @@
}
public Response request(Command command, int timeout) throws IOException {
- FutureResponse response = asyncRequest(command);
+ FutureResponse response = asyncRequest(command, null);
while (timeout > 0) {
int time = timeout;
if (timeout > requestTimeout) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Apr 7 06:31:59 2006
@@ -26,6 +26,7 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.logging.Log;
@@ -79,7 +80,7 @@
}
}
- public FutureResponse asyncRequest(Command command) throws IOException{
+ public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Fri Apr 7 06:31:59 2006
@@ -110,6 +110,7 @@
if(server==null){
server=(VMTransportServer) bind(location,true);
TransportConnector connector=new TransportConnector(broker.getBroker(),server);
+ connector.setUri(location);
connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
connector.start();
connectors.put(host,connector);
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java Fri Apr 7 06:31:59 2006
@@ -37,7 +37,7 @@
public void testConnectingToUnavailableServer() throws Exception {
try {
- transport.asyncRequest(new ActiveMQMessage());
+ transport.asyncRequest(new ActiveMQMessage(), null);
fail("This should never succeed");
}
catch (IOException e) {