You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/04/07 15:32:21 UTC

svn commit: r392288 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/activeio/ main/java/org/apache/activemq/transport/failover/ ma...

Author: chirino
Date: Fri Apr  7 06:31:59 2006
New Revision: 392288

URL: http://svn.apache.org/viewcvs?rev=392288&view=rev
Log:
Updated the Transport interface so that you can pass in a ResponseCallback object that will be called when the response for a request arrives.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Apr  7 06:31:59 2006
@@ -45,6 +45,8 @@
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.IdGenerator;
@@ -376,26 +378,47 @@
             try{
                 if(command.isMessageDispatch()){
                     waitStarted();
-                    MessageDispatch md=(MessageDispatch) command;
+                    final MessageDispatch md=(MessageDispatch) command;
                     DemandSubscription sub=(DemandSubscription) subscriptionMapByLocalId.get(md.getConsumerId());
                     if(sub!=null){
                         Message message= configureMessage(md);
                         if(trace)
                             log.trace("bridging "+localBrokerName+" -> "+remoteBrokerName+": "+message);
-                        if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){
+                        
+                        
+                        if( !message.isResponseRequired() ) {
+                            
+                            // If the message was originally sent using async send, we will preserve that QOS
+                            // by bridging it using an async send (small chance of message loss).
                             remoteBroker.oneway(message);
-                        }else{
-                            Response response=remoteBroker.request(message);
-                            if(response.isException()){
-                                ExceptionResponse er=(ExceptionResponse) response;
-                                serviceLocalException(er.getException());
-                            }
+                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                            
+                        } else {
+                            
+                            // The message was not sent using async send, so we should only ack the local 
+                            // broker when we get confirmation that the remote broker has received the message.
+                            ResponseCallback callback = new ResponseCallback() {
+                                public void onCompletion(FutureResponse future) {
+                                    try {
+                                        Response response = future.getResult();
+                                        if(response.isException()){
+                                            ExceptionResponse er=(ExceptionResponse) response;
+                                            serviceLocalException(er.getException());
+                                        } else {
+                                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                                        }
+                                    } catch (IOException e) {
+                                        serviceLocalException(e);
+                                    }
+                                }
+                            };
+
+                            remoteBroker.asyncRequest(message, callback);
                         }
                         
                       // Ack on every message since we don't know if the broker is blocked due to memory
                       // usage and is waiting for an Ack to un-block him. 
-                      localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
-
+                       
                       // Acking a range is more efficient, but also more prone to locking up a server
                       // Perhaps doing something like the following should be policy based.
 //                        int dispatched = sub.incrementDispatched();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Fri Apr  7 06:31:59 2006
@@ -24,13 +24,17 @@
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
@@ -199,14 +203,14 @@
         }
     }
 
-    protected void serviceLocalException(IOException error) {
+    protected void serviceLocalException(Throwable error) {
         System.out.println("Unexpected local exception: "+error);
         error.printStackTrace();
     }    
     protected void serviceLocalCommand(Command command) {
         try {
             if( command.isMessageDispatch() ) {
-                MessageDispatch md = (MessageDispatch) command;
+                final MessageDispatch md = (MessageDispatch) command;
                 Message message = md.getMessage();
                 message.setProducerId(producerInfo.getProducerId());
                 message.setDestination( md.getDestination() );
@@ -216,11 +220,40 @@
                 message.setTransactionId(null);
                 message.evictMarshlledForm();
 
-                remoteBroker.oneway( message );
                 
+                if( !message.isResponseRequired() ) {
+                    
+                    // If the message was originally sent using async send, we will preserve that QOS
+                    // by bridging it using an async send (small chance of message loss).
+                    remoteBroker.oneway(message);
+                    localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                    
+                } else {
+                    
+                    // The message was not sent using async send, so we should only ack the local 
+                    // broker when we get confirmation that the remote broker has received the message.
+                    ResponseCallback callback = new ResponseCallback() {
+                        public void onCompletion(FutureResponse future) {
+                            try {
+                                Response response = future.getResult();
+                                if(response.isException()){
+                                    ExceptionResponse er=(ExceptionResponse) response;
+                                    serviceLocalException(er.getException());
+                                } else {
+                                    localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+                                }
+                            } catch (IOException e) {
+                                serviceLocalException(e);
+                            }
+                        }
+                    };
+
+                    remoteBroker.asyncRequest(message, callback);
+                }
+                
+                                
                 // Ack on every message since we don't know if the broker is blocked due to memory
                 // usage and is waiting for an Ack to un-block him. 
-                localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
 
                 // Acking a range is more efficient, but also more prone to locking up a server
                 // Perhaps doing something like the following should be policy based.

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/FutureResponse.java Fri Apr  7 06:31:59 2006
@@ -16,62 +16,47 @@
  */
 package org.apache.activemq.transport;
 
-import edu.emory.mathcs.backport.java.util.concurrent.Callable;
-import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
-import edu.emory.mathcs.backport.java.util.concurrent.FutureTask;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
 
 import org.apache.activemq.command.Response;
-import org.apache.activemq.util.IOExceptionSupport;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 
-public class FutureResponse extends FutureTask {
+public class FutureResponse {
+           
+    private final ResponseCallback responseCallback;
+    private final ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
     
-    private static final Callable EMPTY_CALLABLE = new Callable() {
-        public Object call() throws Exception {
-            return null;
-        }};
-    
-    public FutureResponse() {
-        super(EMPTY_CALLABLE);
+    public FutureResponse(ResponseCallback responseCallback) {
+        this.responseCallback = responseCallback;
     }
 
-    public synchronized Response getResult() throws IOException {
+    public Response getResult() throws IOException {
         try {
-            return (Response) super.get();
+            return (Response) responseSlot.take();
         } catch (InterruptedException e) {
             throw new InterruptedIOException("Interrupted.");
-        } catch (ExecutionException e) {
-            Throwable target = e.getCause();
-            if( target instanceof IOException ) {
-                throw (IOException)target;
-            } else {
-                throw IOExceptionSupport.create(target);
-            }
         }
     }
     
-    public synchronized Response getResult(int timeout) throws IOException {
+    public Response getResult(int timeout) throws IOException {
         try {
-            return (Response) super.get(timeout,TimeUnit.MILLISECONDS);
+            return (Response) responseSlot.poll(timeout,TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             throw new InterruptedIOException("Interrupted.");
-        } catch (ExecutionException e) {
-            Throwable target = e.getCause();
-            if( target instanceof IOException ) {
-                throw (IOException)target;
-            } else {
-                throw IOExceptionSupport.create(target);
-            }
-        }catch(TimeoutException e){
-            return null;
         }
     }
     
-    public synchronized void set(Object result) {
-        super.set(result);
+    public void set(Response result) throws InterruptedIOException {
+        try {
+            responseSlot.put(result);
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException("Interrupted.");
+        }
+        if( responseCallback !=null ) {
+            responseCallback.onCompletion(this);
+        }        
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/MutexTransport.java Fri Apr  7 06:31:59 2006
@@ -33,9 +33,9 @@
         super(next);
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
         synchronized(writeMutex) {
-            return next.asyncRequest(command);
+            return next.asyncRequest(command, null);
         }
     }
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java?rev=392288&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java Fri Apr  7 06:31:59 2006
@@ -0,0 +1,24 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport;
+
+/**
+ * @version $Revision$
+ */
+public interface ResponseCallback {
+    void onCompletion(FutureResponse resp);
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCallback.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Fri Apr  7 06:31:59 2006
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
@@ -55,34 +56,38 @@
         next.oneway(command);
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
         command.setCommandId(sequenceGenerator.getNextSequenceId());
         command.setResponseRequired(true);
-        FutureResponse future = new FutureResponse();
+        FutureResponse future = new FutureResponse(responseCallback);
         requestMap.put(new Integer(command.getCommandId()), future);
         next.oneway(command);
         return future;
     }
     
     public Response request(Command command) throws IOException { 
-        FutureResponse response = asyncRequest(command);
+        FutureResponse response = asyncRequest(command, null);
         return response.getResult();
     }
     
     public Response request(Command command,int timeout) throws IOException {
-        FutureResponse response = asyncRequest(command);
+        FutureResponse response = asyncRequest(command, null);
         return response.getResult(timeout);
     }
     
     public void onCommand(Command command) {
         boolean debug = log.isDebugEnabled();
         if( command.isResponse() ) {
-            Response response = (Response) command;
-            FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
-            if( future!=null ) {
-                future.set(response);
-            } else {
-                if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
+            try {
+                Response response = (Response) command;
+                FutureResponse future = (FutureResponse) requestMap.remove(new Integer(response.getCorrelationId()));
+                if( future!=null ) {
+                    future.set(response);
+                } else {
+                    if( debug ) log.debug("Received unexpected response for command id: "+response.getCorrelationId());
+                }
+            } catch (InterruptedIOException e) {
+                onException(e);
             }
         } else {
             getTransportListener().onCommand(command);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Fri Apr  7 06:31:59 2006
@@ -39,12 +39,15 @@
 
     /**
      * An asynchronous request response where the Receipt will be returned
-     * in the future
+     * in the future.  If responseCallback is not null, then it will be called
+     * when the response has been completed.
+     * 
      * @param command 
+     * @param responseCallback TODO
      * @return the FutureResponse
      * @throws IOException 
      */
-    public FutureResponse asyncRequest(Command command) throws IOException;
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException;
     
     /**
      * A synchronous request response

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Fri Apr  7 06:31:59 2006
@@ -86,8 +86,8 @@
         next.oneway(command);
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
-        return next.asyncRequest(command);
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
+        return next.asyncRequest(command, null);
     }
 
     public Response request(Command command) throws IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Fri Apr  7 06:31:59 2006
@@ -66,7 +66,7 @@
         return null;
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/activeio/ActiveIOTransport.java Fri Apr  7 06:31:59 2006
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.management.CountStatisticImpl;
 import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 
@@ -96,7 +97,7 @@
         commandChannel.writeCommand(command);
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri Apr  7 06:31:59 2006
@@ -32,6 +32,7 @@
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
@@ -387,7 +388,7 @@
         }
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Fri Apr  7 06:31:59 2006
@@ -33,6 +33,7 @@
 import org.apache.activemq.transport.CompositeTransport;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
@@ -411,7 +412,7 @@
         return true;
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException {
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java Fri Apr  7 06:31:59 2006
@@ -22,6 +22,7 @@
 import org.apache.activemq.command.Response;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
@@ -95,8 +96,8 @@
         next.oneway(command);
     }
 
-    synchronized public FutureResponse asyncRequest(Command command) throws IOException {
-        return next.asyncRequest(command);
+    synchronized public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
+        return next.asyncRequest(command, null);
     }
 
     synchronized public Response request(Command command) throws IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Fri Apr  7 06:31:59 2006
@@ -78,7 +78,7 @@
     }
 
     public Response request(Command command) throws IOException {
-        FutureResponse response = asyncRequest(command);
+        FutureResponse response = asyncRequest(command, null);
         while (true) {
             Response result = response.getResult(requestTimeout);
             if (result != null) {
@@ -89,7 +89,7 @@
     }
 
     public Response request(Command command, int timeout) throws IOException {
-        FutureResponse response = asyncRequest(command);
+        FutureResponse response = asyncRequest(command, null);
         while (timeout > 0) {
             int time = timeout;
             if (timeout > requestTimeout) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Fri Apr  7 06:31:59 2006
@@ -26,6 +26,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.transport.FutureResponse;
+import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.commons.logging.Log;
@@ -79,7 +80,7 @@
         }
     }
 
-    public FutureResponse asyncRequest(Command command) throws IOException{
+    public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException{
         throw new AssertionError("Unsupported Method");
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java Fri Apr  7 06:31:59 2006
@@ -110,6 +110,7 @@
                 if(server==null){
                     server=(VMTransportServer) bind(location,true);
                     TransportConnector connector=new TransportConnector(broker.getBroker(),server);
+                    connector.setUri(location);
                     connector.setTaskRunnerFactory( broker.getTaskRunnerFactory() );
                     connector.start();
                     connectors.put(host,connector);

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java?rev=392288&r1=392287&r2=392288&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/BadConnectionTest.java Fri Apr  7 06:31:59 2006
@@ -37,7 +37,7 @@
 
     public void testConnectingToUnavailableServer() throws Exception {
         try {
-            transport.asyncRequest(new ActiveMQMessage());
+            transport.asyncRequest(new ActiveMQMessage(), null);
             fail("This should never succeed");
         }
         catch (IOException e) {