You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/14 19:58:51 UTC

svn commit: r443430 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ state/ transport/failover/ transport/fanout/

Author: chirino
Date: Thu Sep 14 10:58:49 2006
New Revision: 443430

URL: http://svn.apache.org/viewvc?view=rev&rev=443430
Log:
https://issues.apache.org/activemq/browse/AMQ-915

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
      - copied unchanged from r443423, incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
      - copied unchanged from r443423, incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Thu Sep 14 10:58:49 2006
@@ -62,6 +62,7 @@
 import org.apache.activemq.state.ConsumerState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
+import org.apache.activemq.state.TransactionState;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -308,7 +309,12 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        broker.beginTransaction(context, info.getTransactionId());
+        
+        // Avoid replaying dup commands
+        if( cs.getTransactionState(info.getTransactionId())==null ) {
+        	cs.addTransactionState(info.getTransactionId());
+            broker.beginTransaction(context, info.getTransactionId());
+        }
         return null;
     }
     
@@ -325,9 +331,22 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        int result = broker.prepareTransaction(context, info.getTransactionId());
-        IntegerResponse response = new IntegerResponse(result);
-        return response;
+        
+        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+        if( transactionState == null )
+            throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
+
+        // Avoid dups.
+        if( !transactionState.isPrepared() ) {
+            transactionState.setPrepared(true);
+            int result = broker.prepareTransaction(context, info.getTransactionId());
+            transactionState.setPreparedResult(result);
+            IntegerResponse response = new IntegerResponse(result);
+            return response;
+        } else {
+            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
+            return response;
+        }
     }
 
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@@ -336,8 +355,12 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
+        
+        cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context, info.getTransactionId(), true);
+
         return null;
+        
     }
 
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@@ -346,7 +369,9 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        broker.commitTransaction(context, info.getTransactionId(), false);
+        
+        cs.removeTransactionState(info.getTransactionId());
+    	broker.commitTransaction(context, info.getTransactionId(), false);
         return null;
     }
 
@@ -356,7 +381,9 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        broker.rollbackTransaction(context, info.getTransactionId());
+        
+        cs.removeTransactionState(info.getTransactionId());
+    	broker.rollbackTransaction(context, info.getTransactionId());
         return null;
     }
     
@@ -382,10 +409,32 @@
 
 
     public Response processMessage(Message messageSend) throws Exception {
+    	
         ProducerId producerId = messageSend.getProducerId();
         ConnectionState state = lookupConnectionState(producerId);
         ConnectionContext context = state.getContext();
-        broker.send(context, messageSend);
+        
+        // If the message originates from this client connection, 
+        // then, finde the associated producer state so we can do some dup detection.
+        ProducerState producerState=null;        
+        if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
+	        SessionState ss = state.getSessionState(producerId.getParentId());
+	        if( ss == null )
+	            throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
+	        producerState = ss.getProducerState(producerId); 
+        }
+        
+        if( producerState == null ) {
+            broker.send(context, messageSend);
+        } else {
+	        // Avoid Dups.
+	        long seq = messageSend.getMessageId().getProducerSequenceId();
+	        if( seq > producerState.getLastSequenceId() ) {
+	        	producerState.setLastSequenceId(seq);
+	            broker.send(context, messageSend);
+	        }
+        }
+        
         return null;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Thu Sep 14 10:58:49 2006
@@ -30,6 +30,7 @@
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,7 @@
 public class ConnectionState {
     
     final ConnectionInfo info;
+    private final ConcurrentHashMap transactions = new ConcurrentHashMap();
     private final ConcurrentHashMap sessions = new ConcurrentHashMap();
     private final List tempDestinations = Collections.synchronizedList(new ArrayList());
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -63,6 +65,20 @@
                 iter.remove();
             }
         }
+    }
+	
+    public void addTransactionState(TransactionId id) {
+    	checkShutdown();
+    	transactions.put(id, new TransactionState(id));
+    }        
+    public TransactionState getTransactionState(TransactionId id) {
+        return (TransactionState)transactions.get(id);
+    }
+    public Collection getTransactionStates() {
+        return transactions.values();
+    }
+    public TransactionState removeTransactionState(TransactionId id) {
+        return (TransactionState) transactions.remove(id);
     }
 
     public void addSession(SessionInfo info) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Sep 14 10:58:49 2006
@@ -55,21 +55,39 @@
  */
 public class ConnectionStateTracker implements CommandVisitor {
 
-    private final static Response TRACKED_RESPONSE_MARKER = new Response();
+	private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
     
-    boolean trackTransactions = false;
-    boolean trackMessages = false;
-    boolean trackAcks = false;
+	private boolean trackTransactions = false;
     
     private boolean restoreSessions=true;
-    boolean restoreConsumers=true;
+    private boolean restoreConsumers=true;
     private boolean restoreProducers=true;
+    private boolean restoreTransaction=true;
     
     protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
-    
-    public boolean track(Command command) throws IOException {
+        
+    private class RemoveTransactionAction implements Runnable {
+		private final TransactionInfo info;
+		public RemoveTransactionAction(TransactionInfo info) {
+			this.info = info;
+		}
+		public void run() {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        cs.removeTransactionState(info.getTransactionId());
+		}
+    }
+
+    /**
+     * 
+     * 
+     * @param command
+     * @return null if the command is not state tracked.
+     * @throws IOException
+     */
+    public Tracked track(Command command) throws IOException {
         try {
-            return command.visit(this)!=null;
+        	return (Tracked) command.visit(this);
         } catch (IOException e) {
             throw e;
         } catch (Throwable e) {
@@ -86,10 +104,23 @@
             
             if( restoreSessions )
                 restoreSessions(transport, connectionState);
+            
+            if( restoreTransaction )
+            	restoreTransactions(transport, connectionState);
         }
     }
 
-    /**
+    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
+    	for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
+			TransactionState transactionState = (TransactionState) iter.next();
+			for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
+				Command command = (Command) iterator.next();
+	            transport.oneway(command);
+			}
+		}
+	}
+
+	/**
      * @param transport
      * @param connectionState
      * @throws IOException
@@ -227,48 +258,113 @@
         return null;
     }
     public Response processMessage(Message send) throws Exception {
-        return null;
-    }
+    	if( trackTransactions && send.getTransactionId() != null ) {
+            ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+            ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+            transactionState.addCommand(send);    		
+            return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
+    }    
     public Response processMessageAck(MessageAck ack) throws Exception {
-        return null;
+    	if( trackTransactions && ack.getTransactionId() != null ) {
+            ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
+            ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+            TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
+            transactionState.addCommand(ack);    		
+            return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
     }
+    
     public Response processBeginTransaction(TransactionInfo info) throws Exception {
-        return null;
-    }
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        cs.addTransactionState(info.getTransactionId());
+	        return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
+    }    
     public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-        return null;
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        transactionState.addCommand(info);
+	        return TRACKED_RESPONSE_MARKER;
+    	} 
+    	return null;
     }
+    
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-        return null;
-    }
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        if( transactionState !=null ) {
+		        transactionState.addCommand(info);
+		        return new Tracked(new RemoveTransactionAction(info));
+	        }
+    	}
+    	return null;
+    }        
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-        return null;
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        if( transactionState !=null ) {
+		        transactionState.addCommand(info);
+		        return new Tracked(new RemoveTransactionAction(info));
+	        }
+    	}
+    	return null;
     }
+    
     public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-        return null;
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        if( transactionState !=null ) {
+		        transactionState.addCommand(info);
+		        return new Tracked(new RemoveTransactionAction(info));
+	        }
+    	}
+    	return null;
     }
-    public Response processWireFormat(WireFormatInfo info) throws Exception {
-        return null;
+    
+    public Response processEndTransaction(TransactionInfo info) throws Exception {
+    	if( trackTransactions ) {
+	        ConnectionId connectionId = info.getConnectionId();
+	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+	        transactionState.addCommand(info);
+	        return TRACKED_RESPONSE_MARKER;
+    	}
+    	return null;
     }
-    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+    
+    public Response processRecoverTransactions(TransactionInfo info) {
         return null;
     }
-    public Response processShutdown(ShutdownInfo info) throws Exception {
+    public Response processForgetTransaction(TransactionInfo info) throws Exception {
         return null;
     }
-    public Response processBrokerInfo(BrokerInfo info) throws Exception {
+
+    
+    public Response processWireFormat(WireFormatInfo info) throws Exception {
         return null;
     }
-
-    public Response processRecoverTransactions(TransactionInfo info) {
+    public Response processKeepAlive(KeepAliveInfo info) throws Exception {
         return null;
     }
-
-    public Response processForgetTransaction(TransactionInfo info) throws Exception {
+    public Response processShutdown(ShutdownInfo info) throws Exception {
         return null;
     }
-
-    public Response processEndTransaction(TransactionInfo info) throws Exception {
+    public Response processBrokerInfo(BrokerInfo info) throws Exception {
         return null;
     }
 
@@ -307,5 +403,21 @@
     public void setRestoreSessions(boolean restoreSessions) {
         this.restoreSessions = restoreSessions;
     }
-        
+
+	public boolean isTrackTransactions() {
+		return trackTransactions;
+	}
+
+	public void setTrackTransactions(boolean trackTransactions) {
+		this.trackTransactions = trackTransactions;
+	}
+
+	public boolean isRestoreTransaction() {
+		return restoreTransaction;
+	}
+
+	public void setRestoreTransaction(boolean restoreTransaction) {
+		this.restoreTransaction = restoreTransaction;
+	}
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java Thu Sep 14 10:58:49 2006
@@ -21,7 +21,8 @@
 import org.apache.activemq.command.ProducerInfo;
 
 public class ProducerState {        
-    final ProducerInfo info;  
+    final ProducerInfo info;
+	private long lastSequenceId=-1;  
     
     public ProducerState(ProducerInfo info) {
         this.info = info;
@@ -31,5 +32,11 @@
     }
     public ProducerInfo getInfo() {
         return info;
-    }        
+    }
+	public void setLastSequenceId(long lastSequenceId) {
+		this.lastSequenceId = lastSequenceId;		
+	}
+	public long getLastSequenceId() {
+		return lastSequenceId;
+	}        
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Thu Sep 14 10:58:49 2006
@@ -69,11 +69,13 @@
     }                
     public Set getProducerIds() {
         return producers.keySet();
-    }
-    
+    }    
     public Collection getProducerStates() {
         return producers.values();
     }
+	public ProducerState getProducerState(ProducerId producerId) {
+		return (ProducerState) producers.get(producerId);
+	}
     
     public Collection getConsumerStates() {
         return consumers.values();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 14 10:58:49 2006
@@ -28,6 +28,7 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
+import org.apache.activemq.state.Tracked;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -89,7 +90,10 @@
 	                return;
 	            }
 	            if (command.isResponse()) {
-	                requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+                    Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+                    if( object!=null && object.getClass() == Tracked.class ) {
+                	   ((Tracked)object).onResponses();
+                    }
 	            }
 	            if (!initialized){
 	                if (command.isBrokerInfo()){
@@ -136,6 +140,8 @@
     
     public FailoverTransport() throws InterruptedIOException {
 
+    	stateTracker.setTrackTransactions(true);
+    	
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
 
@@ -372,7 +378,10 @@
                         // the state tracker,
                         // then hold it in the requestMap so that we can replay
                         // it later.
-                        if (!stateTracker.track(command) && command.isResponseRequired()) {
+                        Tracked tracked = stateTracker.track(command);
+                        if( tracked!=null && tracked.isWaitingForResponse() ) {
+                            requestMap.put(new Integer(command.getCommandId()), tracked);
+                        } else if ( tracked==null && command.isResponseRequired()) {
                             requestMap.put(new Integer(command.getCommandId()), command);
                         }
                                                 
@@ -380,13 +389,20 @@
                         try {
                             connectedTransport.oneway(command);
                         } catch (IOException e) {
-                            // If there is an IOException in the send, remove the command from the requestMap
-                            if (!stateTracker.track(command) && command.isResponseRequired()) {
-                                requestMap.remove(new Integer(command.getCommandId()), command);
-                            }
-                            
-                            // Rethrow the exception so it will handled by the outer catch
-                            throw e;
+                        	
+                        	// If the command was not tracked.. we will retry in this method
+                        	if( tracked==null ) {
+                        		
+                        		// since we will retry in this method.. take it out of the request
+                        		// map so that it is not sent 2 times on recovery
+                            	if( command.isResponseRequired() ) {
+                            		requestMap.remove(new Integer(command.getCommandId()));
+                            	}
+                            	
+                                // Rethrow the exception so it will handled by the outer catch
+                                throw e;
+                        	}
+                        	
                         }
                         
                         return;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=443430&r1=443429&r2=443430
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Sep 14 10:58:49 2006
@@ -340,7 +340,7 @@
                 // then hold it in the requestMap so that we can replay
                 // it later.
                 boolean fanout = isFanoutCommand(command);
-                if (!stateTracker.track(command) && command.isResponseRequired() ) {
+                if (stateTracker.track(command)==null && command.isResponseRequired() ) {
                     int size = fanout ? minAckCount : 1;
                     requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
                 }