You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/14 19:30:55 UTC

svn commit: r443423 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq: broker/ state/ transport/failover/ transport/fanout/

Author: chirino
Date: Thu Sep 14 10:30:54 2006
New Revision: 443423

URL: http://svn.apache.org/viewvc?view=rev&rev=443423
Log:
https://issues.apache.org/activemq/browse/AMQ-915

Added:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Thu Sep 14 10:30:54 2006
@@ -61,6 +61,7 @@
 import org.apache.activemq.state.ConsumerState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
+import org.apache.activemq.state.TransactionState;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -307,7 +308,12 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        broker.beginTransaction(context, info.getTransactionId());
+        
+        // Avoid replaying dup commands
+        if( cs.getTransactionState(info.getTransactionId())==null ) {
+        	cs.addTransactionState(info.getTransactionId());
+            broker.beginTransaction(context, info.getTransactionId());
+        }
         return null;
     }
     
@@ -324,9 +330,22 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        int result = broker.prepareTransaction(context, info.getTransactionId());
-        IntegerResponse response = new IntegerResponse(result);
-        return response;
+        
+        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+        if( transactionState == null )
+            throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
+
+        // Avoid dups.
+        if( !transactionState.isPrepared() ) {
+            transactionState.setPrepared(true);
+            int result = broker.prepareTransaction(context, info.getTransactionId());
+            transactionState.setPreparedResult(result);
+            IntegerResponse response = new IntegerResponse(result);
+            return response;
+        } else {
+            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
+            return response;
+        }
     }
 
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@@ -335,8 +354,12 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
+        
+        cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context, info.getTransactionId(), true);
+
         return null;
+        
     }
 
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@@ -345,7 +368,9 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        broker.commitTransaction(context, info.getTransactionId(), false);
+        
+        cs.removeTransactionState(info.getTransactionId());
+    	broker.commitTransaction(context, info.getTransactionId(), false);
         return null;
     }
 
@@ -355,7 +380,9 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        broker.rollbackTransaction(context, info.getTransactionId());
+        
+        cs.removeTransactionState(info.getTransactionId());
+    	broker.rollbackTransaction(context, info.getTransactionId());
         return null;
     }
     
@@ -381,10 +408,32 @@
 
 
     public Response processMessage(Message messageSend) throws Exception {
+    	
         ProducerId producerId = messageSend.getProducerId();
         ConnectionState state = lookupConnectionState(producerId);
         ConnectionContext context = state.getContext();
-        broker.send(context, messageSend);
+        
+        // If the message originates from this client connection, 
+        // then, finde the associated producer state so we can do some dup detection.
+        ProducerState producerState=null;        
+        if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
+	        SessionState ss = state.getSessionState(producerId.getParentId());
+	        if( ss == null )
+	            throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
+	        producerState = ss.getProducerState(producerId); 
+        }
+        
+        if( producerState == null ) {
+            broker.send(context, messageSend);
+        } else {
+	        // Avoid Dups.
+	        long seq = messageSend.getMessageId().getProducerSequenceId();
+	        if( seq > producerState.getLastSequenceId() ) {
+	        	producerState.setLastSequenceId(seq);
+	            broker.send(context, messageSend);
+	        }
+        }
+        
         return null;
     }
 

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Thu Sep 14 10:30:54 2006
@@ -30,6 +30,7 @@
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,7 @@
 public class ConnectionState {
     
     final ConnectionInfo info;
+    private final ConcurrentHashMap transactions = new ConcurrentHashMap();
     private final ConcurrentHashMap sessions = new ConcurrentHashMap();
     private final List tempDestinations = Collections.synchronizedList(new ArrayList());
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -63,6 +65,20 @@
                 iter.remove();
             }
         }
+    }
+	
+    public void addTransactionState(TransactionId id) {
+    	checkShutdown();
+    	transactions.put(id, new TransactionState(id));
+    }        
+    public TransactionState getTransactionState(TransactionId id) {
+        return (TransactionState)transactions.get(id);
+    }
+    public Collection getTransactionStates() {
+        return transactions.values();
+    }
+    public TransactionState removeTransactionState(TransactionId id) {
+        return (TransactionState) transactions.remove(id);
     }
 
     public void addSession(SessionInfo info) {

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Sep 14 10:30:54 2006
@@ -54,21 +54,39 @@
  */
 public class ConnectionStateTracker implements CommandVisitor {
 
-    private final static Response TRACKED_RESPONSE_MARKER = new Response();
+	private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
     
-    boolean trackTransactions = false;
-    boolean trackMessages = false;
-    boolean trackAcks = false;
+	private boolean trackTransactions = false;
     
     private boolean restoreSessions=true;
-    boolean restoreConsumers=true;
+    private boolean restoreConsumers=true;
     private boolean restoreProducers=true;
+    private boolean restoreTransaction=true;
     
     protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
-    
-    public boolean track(Command command) throws IOException {
+        
+    private class RemoveTransactionAction implements Runnable {
+		private final TransactionInfo info;
+		public RemoveTransactionAction(TransactionInfo info) {
+			this.info = info;
+		}
+		public void run() {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        cs.removeTransactionState(info.getTransactionId());
+		}
+    }
+
+    /**
+     * 
+     * 
+     * @param command
+     * @return null if the command is not state tracked.
+     * @throws IOException
+     */
+    public Tracked track(Command command) throws IOException {
         try {
-            return command.visit(this)!=null;
+        	return (Tracked) command.visit(this);
         } catch (IOException e) {
             throw e;
         } catch (Throwable e) {
@@ -85,10 +103,23 @@
             
             if( restoreSessions )
                 restoreSessions(transport, connectionState);
+            
+            if( restoreTransaction )
+            	restoreTransactions(transport, connectionState);
         }
     }
 
-    /**
+    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
+    	for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
+			TransactionState transactionState = (TransactionState) iter.next();
+			for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
+				Command command = (Command) iterator.next();
+	            transport.oneway(command);
+			}
+		}
+	}
+
+	/**
      * @param transport
      * @param connectionState
      * @throws IOException
@@ -226,48 +257,113 @@
         return null;
     }
     public Response processMessage(Message send) throws Exception {
-        return null;
-    }
+    	if( trackTransactions && send.getTransactionId() != null ) {
+            ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+            ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+            transactionState.addCommand(send);    		
+            return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
+    }    
     public Response processMessageAck(MessageAck ack) throws Exception {
-        return null;
+    	if( trackTransactions && ack.getTransactionId() != null ) {
+            ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
+            ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+            TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
+            transactionState.addCommand(ack);    		
+            return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
     }
+    
     public Response processBeginTransaction(TransactionInfo info) throws Exception {
-        return null;
-    }
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        cs.addTransactionState(info.getTransactionId());
+	        return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
+    }    
     public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-        return null;
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        transactionState.addCommand(info);
+	        return TRACKED_RESPONSE_MARKER;
+    	} 
+    	return null;
     }
+    
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-        return null;
-    }
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        if( transactionState !=null ) {
+		        transactionState.addCommand(info);
+		        return new Tracked(new RemoveTransactionAction(info));
+	        }
+    	}
+    	return null;
+    }        
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-        return null;
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        if( transactionState !=null ) {
+		        transactionState.addCommand(info);
+		        return new Tracked(new RemoveTransactionAction(info));
+	        }
+    	}
+    	return null;
     }
+    
     public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-        return null;
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        if( transactionState !=null ) {
+		        transactionState.addCommand(info);
+		        return new Tracked(new RemoveTransactionAction(info));
+	        }
+    	}
+    	return null;
     }
-    public Response processWireFormat(WireFormatInfo info) throws Exception {
-        return null;
+    
+    public Response processEndTransaction(TransactionInfo info) throws Exception {
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        transactionState.addCommand(info);
+	        return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
     }
-    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+    
+    public Response processRecoverTransactions(TransactionInfo info) {
         return null;
     }
-    public Response processShutdown(ShutdownInfo info) throws Exception {
+    public Response processForgetTransaction(TransactionInfo info) throws Exception {
         return null;
     }
-    public Response processBrokerInfo(BrokerInfo info) throws Exception {
+
+    
+    public Response processWireFormat(WireFormatInfo info) throws Exception {
         return null;
     }
-
-    public Response processRecoverTransactions(TransactionInfo info) {
+    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
         return null;
     }
-
-    public Response processForgetTransaction(TransactionInfo info) throws Exception {
+    public Response processShutdown(ShutdownInfo info) throws Exception {
         return null;
     }
-
-    public Response processEndTransaction(TransactionInfo info) throws Exception {
+    public Response processBrokerInfo(BrokerInfo info) throws Exception {
         return null;
     }
 
@@ -302,4 +398,20 @@
     public void setRestoreSessions(boolean restoreSessions) {
         this.restoreSessions = restoreSessions;
     }
+
+	public boolean isTrackTransactions() {
+		return trackTransactions;
+	}
+
+	public void setTrackTransactions(boolean trackTransactions) {
+		this.trackTransactions = trackTransactions;
+	}
+
+	public boolean isRestoreTransaction() {
+		return restoreTransaction;
+	}
+
+	public void setRestoreTransaction(boolean restoreTransaction) {
+		this.restoreTransaction = restoreTransaction;
+	}
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java Thu Sep 14 10:30:54 2006
@@ -21,7 +21,8 @@
 import org.apache.activemq.command.ProducerInfo;
 
 public class ProducerState {        
-    final ProducerInfo info;  
+    final ProducerInfo info;
+	private long lastSequenceId=-1;  
     
     public ProducerState(ProducerInfo info) {
         this.info = info;
@@ -31,5 +32,11 @@
     }
     public ProducerInfo getInfo() {
         return info;
-    }        
+    }
+	public void setLastSequenceId(long lastSequenceId) {
+		this.lastSequenceId = lastSequenceId;		
+	}
+	public long getLastSequenceId() {
+		return lastSequenceId;
+	}        
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Thu Sep 14 10:30:54 2006
@@ -69,11 +69,13 @@
     }                
     public Set getProducerIds() {
         return producers.keySet();
-    }
-    
+    }    
     public Collection getProducerStates() {
         return producers.values();
     }
+	public ProducerState getProducerState(ProducerId producerId) {
+		return (ProducerState) producers.get(producerId);
+	}
     
     public Collection getConsumerStates() {
         return consumers.values();

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java?view=auto&rev=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java Thu Sep 14 10:30:54 2006
@@ -0,0 +1,27 @@
+/**
+ * 
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.Response;
+
+public class Tracked extends Response {
+	
+	private Runnable runnable;
+	
+	public Tracked(Runnable runnable) {
+		this.runnable = runnable;
+	}
+	
+	public void onResponses() {
+		if( runnable != null ) {
+			runnable.run();
+			runnable=null;
+		}
+	}
+	
+	public boolean isWaitingForResponse() {
+		return runnable!=null;
+	}
+	
+}
\ No newline at end of file

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java?view=auto&rev=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java Thu Sep 14 10:30:54 2006
@@ -0,0 +1,79 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.TransactionId;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class TransactionState {        
+    final TransactionId id;
+    
+    public final ArrayList commands = new ArrayList();
+    private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+	private boolean prepared;
+
+	private int preparedResult;
+    
+    public TransactionState(TransactionId id) {
+        this.id = id;
+    }        
+    public String toString() {
+        return id.toString();
+    }
+    
+    public void addCommand(Command operation) {
+    	checkShutdown();
+    	commands.add(operation);            
+    }        
+
+    public List getCommands() {
+    	return commands;            
+    }        
+    
+    private void checkShutdown() {
+		if( shutdown.get() )
+			throw new IllegalStateException("Disposed");
+	}
+    
+    public void shutdown() {
+    	shutdown.set(false);
+    }
+	public TransactionId getId() {
+		return id;
+	}
+	
+	public void setPrepared(boolean prepared) {
+		this.prepared = prepared;
+	}
+	public boolean isPrepared() {
+		return prepared;
+	}
+	public void setPreparedResult(int preparedResult) {
+		this.preparedResult = preparedResult;
+	}
+	public int getPreparedResult() {
+		return preparedResult;
+	}
+
+}
\ No newline at end of file

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 14 10:30:54 2006
@@ -28,6 +28,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
+import org.apache.activemq.state.Tracked;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -86,7 +87,10 @@
                 return;
             }
             if (command.isResponse()) {
-                requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+                Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+                if( object!=null && object.getClass() == Tracked.class ) {
+                	((Tracked)object).onResponses();
+                }
             }
             if (!initialized){
                 if (command.isBrokerInfo()){
@@ -132,6 +136,8 @@
 
     public FailoverTransport() throws InterruptedIOException {
 
+    	stateTracker.setTrackTransactions(true);
+    	
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
 
@@ -368,7 +374,10 @@
                         // the state tracker,
                         // then hold it in the requestMap so that we can replay
                         // it later.
-                        if (!stateTracker.track(command) && command.isResponseRequired()) {
+                        Tracked tracked = stateTracker.track(command);
+                        if( tracked!=null && tracked.isWaitingForResponse() ) {
+                            requestMap.put(new Integer(command.getCommandId()), tracked);
+                        } else if ( tracked==null && command.isResponseRequired()) {
                             requestMap.put(new Integer(command.getCommandId()), command);
                         }
                                                 
@@ -376,13 +385,20 @@
                         try {
                             connectedTransport.oneway(command);
                         } catch (IOException e) {
-                            // If there is an IOException in the send, remove the command from the requestMap
-                            if (!stateTracker.track(command) && command.isResponseRequired()) {
-                                requestMap.remove(new Integer(command.getCommandId()), command);
-                            }
-                            
-                            // Rethrow the exception so it will handled by the outer catch
-                            throw e;
+                        	
+                        	// If the command was not tracked.. we will retry in this method
+                        	if( tracked==null ) {
+                        		
+                        		// since we will retry in this method.. take it out of the request
+                        		// map so that it is not sent 2 times on recovery
+                            	if( command.isResponseRequired() ) {
+                            		requestMap.remove(new Integer(command.getCommandId()));
+                            	}
+                            	
+                                // Rethrow the exception so it will handled by the outer catch
+                                throw e;
+                        	}
+                        	
                         }
                         
                         return;

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Sep 14 10:30:54 2006
@@ -340,7 +340,7 @@
                 // then hold it in the requestMap so that we can replay
                 // it later.
                 boolean fanout = isFanoutCommand(command);
-                if (!stateTracker.track(command) && command.isResponseRequired() ) {
+                if (stateTracker.track(command)==null && command.isResponseRequired() ) {
                     int size = fanout ? minAckCount : 1;
                     requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
                 }