You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/14 19:38:33 UTC

svn commit: r443425 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq: broker/ state/ transport/failover/ transport/fanout/

Author: chirino
Date: Thu Sep 14 10:38:32 2006
New Revision: 443425

URL: http://svn.apache.org/viewvc?view=rev&rev=443425
Log:
Rolling back commit 443271 since it is breaking the build

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Thu Sep 14 10:38:32 2006
@@ -62,7 +62,6 @@
 import org.apache.activemq.state.ConsumerState;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.state.SessionState;
-import org.apache.activemq.state.TransactionState;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -309,12 +308,7 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        
-        // Avoid replaying dup commands
-        if( cs.getTransactionState(info.getTransactionId())==null ) {
-        	cs.addTransactionState(info.getTransactionId());
-            broker.beginTransaction(context, info.getTransactionId());
-        }
+        broker.beginTransaction(context, info.getTransactionId());
         return null;
     }
     
@@ -331,22 +325,9 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        
-        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-        if( transactionState == null )
-            throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
-
-        // Avoid dups.
-        if( !transactionState.isPrepared() ) {
-            transactionState.setPrepared(true);
-            int result = broker.prepareTransaction(context, info.getTransactionId());
-            transactionState.setPreparedResult(result);
-            IntegerResponse response = new IntegerResponse(result);
-            return response;
-        } else {
-            IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
-            return response;
-        }
+        int result = broker.prepareTransaction(context, info.getTransactionId());
+        IntegerResponse response = new IntegerResponse(result);
+        return response;
     }
 
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@@ -355,12 +336,8 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        
-        cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context, info.getTransactionId(), true);
-
         return null;
-        
     }
 
     public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@@ -369,8 +346,6 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        
-        cs.removeTransactionState(info.getTransactionId());
         broker.commitTransaction(context, info.getTransactionId(), false);
         return null;
     }
@@ -381,8 +356,6 @@
         if( cs!=null ) {
            context = cs.getContext();
         }
-        
-        cs.removeTransactionState(info.getTransactionId());
         broker.rollbackTransaction(context, info.getTransactionId());
         return null;
     }
@@ -409,32 +382,10 @@
 
 
     public Response processMessage(Message messageSend) throws Exception {
-    	
         ProducerId producerId = messageSend.getProducerId();
         ConnectionState state = lookupConnectionState(producerId);
         ConnectionContext context = state.getContext();
-        
-        // If the message originates from this client connection, 
-        // then, finde the associated producer state so we can do some dup detection.
-        ProducerState producerState=null;        
-        if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
-	        SessionState ss = state.getSessionState(producerId.getParentId());
-	        if( ss == null )
-	            throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
-	        producerState = ss.getProducerState(producerId); 
-        }
-        
-        if( producerState == null ) {
-            broker.send(context, messageSend);
-        } else {
-	        // Avoid Dups.
-	        long seq = messageSend.getMessageId().getProducerSequenceId();
-	        if( seq > producerState.getLastSequenceId() ) {
-	        	producerState.setLastSequenceId(seq);
-	            broker.send(context, messageSend);
-	        }
-        }
-        
+        broker.send(context, messageSend);
         return null;
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Thu Sep 14 10:38:32 2006
@@ -30,7 +30,6 @@
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
 
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +37,6 @@
 public class ConnectionState {
     
     final ConnectionInfo info;
-    private final ConcurrentHashMap transactions = new ConcurrentHashMap();
     private final ConcurrentHashMap sessions = new ConcurrentHashMap();
     private final List tempDestinations = Collections.synchronizedList(new ArrayList());
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -65,20 +63,6 @@
                 iter.remove();
             }
         }
-    }
-	
-    public void addTransactionState(TransactionId id) {
-    	checkShutdown();
-    	transactions.put(id, new TransactionState(id));
-    }        
-    public TransactionState getTransactionState(TransactionId id) {
-        return (TransactionState)transactions.get(id);
-    }
-    public Collection getTransactionStates() {
-        return transactions.values();
-    }
-    public TransactionState removeTransactionState(TransactionId id) {
-        return (TransactionState) transactions.remove(id);
     }
 
     public void addSession(SessionInfo info) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Sep 14 10:38:32 2006
@@ -55,41 +55,21 @@
  */
 public class ConnectionStateTracker implements CommandVisitor {
 
-	private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+    private final static Response TRACKED_RESPONSE_MARKER = new Response();
     
-	private boolean trackTransactions = false;
+    boolean trackTransactions = false;
+    boolean trackMessages = false;
+    boolean trackAcks = false;
     
     private boolean restoreSessions=true;
-    private boolean restoreConsumers=true;
+    boolean restoreConsumers=true;
     private boolean restoreProducers=true;
-    private boolean restoreTransaction=true;
     
     protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
-        
-    private class RemoveTransactionAction implements Runnable {
-		private final TransactionInfo info;
-		public RemoveTransactionAction(TransactionInfo info) {
-			this.info = info;
-		}
-		public void run() {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
-	        if( cs != null ) {
-	        	cs.removeTransactionState(info.getTransactionId());
-	        }
-		}
-    }
-
-    /**
-     * 
-     * 
-     * @param command
-     * @return null if the command is not state tracked.
-     * @throws IOException
-     */
-    public Tracked track(Command command) throws IOException {
+    
+    public boolean track(Command command) throws IOException {
         try {
-        	return (Tracked) command.visit(this);
+            return command.visit(this)!=null;
         } catch (IOException e) {
             throw e;
         } catch (Throwable e) {
@@ -106,22 +86,9 @@
             
             if( restoreSessions )
                 restoreSessions(transport, connectionState);
-            
-            if( restoreTransaction )
-            	restoreTransactions(transport, connectionState);
         }
     }
 
-    private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
-    	for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
-			TransactionState transactionState = (TransactionState) iter.next();
-			for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
-				Command command = (Command) iterator.next();
-	            transport.oneway(command);
-			}
-		}
-	}
-
     /**
      * @param transport
      * @param connectionState
@@ -260,103 +227,26 @@
         return null;
     }
     public Response processMessage(Message send) throws Exception {
-    	if( trackTransactions && send.getTransactionId() != null ) {
-            ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
-            ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-            TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
-            transactionState.addCommand(send);    		
-            return TRACKED_RESPONSE_MARKER;
-    	}
-    	return null;
-    }    
+        return null;
+    }
     public Response processMessageAck(MessageAck ack) throws Exception {
-    	if( trackTransactions && ack.getTransactionId() != null ) {
-            ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
-            ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-            TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
-            transactionState.addCommand(ack);    		
-            return TRACKED_RESPONSE_MARKER;
-    	}
-    	return null;
+        return null;
     }
-    
     public Response processBeginTransaction(TransactionInfo info) throws Exception {
-    	if( trackTransactions ) {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-	        cs.addTransactionState(info.getTransactionId());
-	        return TRACKED_RESPONSE_MARKER;
-    	}
-    	return null;
-    }    
+        return null;
+    }
     public Response processPrepareTransaction(TransactionInfo info) throws Exception {
-    	if( trackTransactions ) {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-	        transactionState.addCommand(info);
-	        return TRACKED_RESPONSE_MARKER;
-    	} 
-    	return null;
+        return null;
     }
-    
     public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
-    	if( trackTransactions ) {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-	        if( transactionState !=null ) {
-		        transactionState.addCommand(info);
-		        return new Tracked(new RemoveTransactionAction(info));
-	        }
-    	}
-    	return null;
-    }        
-    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
-    	if( trackTransactions ) {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-	        if( transactionState !=null ) {
-		        transactionState.addCommand(info);
-		        return new Tracked(new RemoveTransactionAction(info));
-	        }
-    	}
-    	return null;
-    }
-    
-    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
-    	if( trackTransactions ) {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-	        if( transactionState !=null ) {
-		        transactionState.addCommand(info);
-		        return new Tracked(new RemoveTransactionAction(info));
-	        }
-    	}
-    	return null;
-    }
-    
-    public Response processEndTransaction(TransactionInfo info) throws Exception {
-    	if( trackTransactions ) {
-	        ConnectionId connectionId = info.getConnectionId();
-	        ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
-	        TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-	        transactionState.addCommand(info);
-	        return TRACKED_RESPONSE_MARKER;
-    	}
-    	return null;
+        return null;
     }
-    
-    public Response processRecoverTransactions(TransactionInfo info) {
+    public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
         return null;
     }
-    public Response processForgetTransaction(TransactionInfo info) throws Exception {
+    public Response processRollbackTransaction(TransactionInfo info) throws Exception {
         return null;
     }
-
-    
     public Response processWireFormat(WireFormatInfo info) throws Exception {
         return null;
     }
@@ -370,6 +260,18 @@
         return null;
     }
 
+    public Response processRecoverTransactions(TransactionInfo info) {
+        return null;
+    }
+
+    public Response processForgetTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
+    public Response processEndTransaction(TransactionInfo info) throws Exception {
+        return null;
+    }
+
     public Response processFlush(FlushCommand command) throws Exception {
         return null;
     }
@@ -405,20 +307,5 @@
     public void setRestoreSessions(boolean restoreSessions) {
         this.restoreSessions = restoreSessions;
     }
-
-	public boolean isTrackTransactions() {
-		return trackTransactions;
-	}
-
-	public void setTrackTransactions(boolean trackTransactions) {
-		this.trackTransactions = trackTransactions;
-	}
-
-	public boolean isRestoreTransaction() {
-		return restoreTransaction;
-	}
-
-	public void setRestoreTransaction(boolean restoreTransaction) {
-		this.restoreTransaction = restoreTransaction;
-	}
+        
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java Thu Sep 14 10:38:32 2006
@@ -22,7 +22,6 @@
 
 public class ProducerState {        
     final ProducerInfo info;  
-	private long lastSequenceId=-1;  
     
     public ProducerState(ProducerInfo info) {
         this.info = info;
@@ -32,11 +31,5 @@
     }
     public ProducerInfo getInfo() {
         return info;
-    }
-	public void setLastSequenceId(long lastSequenceId) {
-		this.lastSequenceId = lastSequenceId;		
-	}
-	public long getLastSequenceId() {
-		return lastSequenceId;
-	}        
+    }        
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Thu Sep 14 10:38:32 2006
@@ -74,9 +74,6 @@
     public Collection getProducerStates() {
         return producers.values();
     }
-	public ProducerState getProducerState(ProducerId producerId) {
-		return (ProducerState) producers.get(producerId);
-	}
     
     public Collection getConsumerStates() {
         return consumers.values();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 14 10:38:32 2006
@@ -28,7 +28,6 @@
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
-import org.apache.activemq.state.Tracked;
 import org.apache.activemq.thread.DefaultThreadPools;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
@@ -90,10 +89,7 @@
 	                return;
 	            }
 	            if (command.isResponse()) {
-                        Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
-                        if( object!=null && object.getClass() == Tracked.class ) {
-                             ((Tracked)object).onResponses();
-                        }
+	                requestMap.remove(new Integer(((Response) command).getCorrelationId()));
 	            }
 	            if (!initialized){
 	                if (command.isBrokerInfo()){
@@ -140,8 +136,6 @@
     
     public FailoverTransport() throws InterruptedIOException {
 
-    	stateTracker.setTrackTransactions(true);
-    	
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
 
@@ -378,10 +372,7 @@
                         // the state tracker,
                         // then hold it in the requestMap so that we can replay
                         // it later.
-                        Tracked tracked = stateTracker.track(command);
-                        if( tracked!=null && tracked.isWaitingForResponse() ) {
-                            requestMap.put(new Integer(command.getCommandId()), tracked);
-                        } else if ( tracked==null && command.isResponseRequired()) {
+                        if (!stateTracker.track(command) && command.isResponseRequired()) {
                             requestMap.put(new Integer(command.getCommandId()), command);
                         }
                                                 
@@ -389,19 +380,13 @@
                         try {
                             connectedTransport.oneway(command);
                         } catch (IOException e) {
-                        	
-                        	// If the command was not tracked.. we will retry in this method
-                        	if( tracked==null ) {
-                        		
-                        		// since we will retry in this method.. take it out of the request
-                        		// map so that it is not sent 2 times on recovery
-                            	if( command.isResponseRequired() ) {
-                            		requestMap.remove(new Integer(command.getCommandId()));
-                            	}
-                            	
-                                // Rethrow the exception so it will handled by the outer catch
-                                throw e;
-                        	}
+                            // If there is an IOException in the send, remove the command from the requestMap
+                            if (!stateTracker.track(command) && command.isResponseRequired()) {
+                                requestMap.remove(new Integer(command.getCommandId()), command);
+                            }
+                            
+                            // Rethrow the exception so it will handled by the outer catch
+                            throw e;
                         }
                         
                         return;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Sep 14 10:38:32 2006
@@ -340,7 +340,7 @@
                 // then hold it in the requestMap so that we can replay
                 // it later.
                 boolean fanout = isFanoutCommand(command);
-                if (stateTracker.track(command)==null && command.isResponseRequired() ) {
+                if (!stateTracker.track(command) && command.isResponseRequired() ) {
                     int size = fanout ? minAckCount : 1;
                     requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
                 }