You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/26 16:00:38 UTC

svn commit: r631244 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: state/ConnectionStateTracker.java transport/ResponseCorrelator.java transport/failover/FailoverTransport.java

Author: rajdavies
Date: Tue Feb 26 07:00:37 2008
New Revision: 631244

URL: http://svn.apache.org/viewvc?rev=631244&view=rev
Log:
optionally replay messages on the fault tolerant transport

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=631244&r1=631243&r2=631244&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Tue Feb 26 07:00:37 2008
@@ -18,6 +18,8 @@
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.command.Command;
@@ -28,6 +30,7 @@
 import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.Response;
@@ -48,14 +51,24 @@
     private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
 
     protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
-
+     
     private boolean trackTransactions;
     private boolean restoreSessions = true;
     private boolean restoreConsumers = true;
     private boolean restoreProducers = true;
     private boolean restoreTransaction = true;
-
-
+    private boolean trackMessages = true;
+    private int maxCacheSize = 128 * 1024;
+    private int currentCacheSize;
+    private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
+        protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
+            boolean result = currentCacheSize > maxCacheSize;
+            currentCacheSize -= eldest.getValue().getSize();
+            return result;
+        }
+    };
+    
+    
     private class RemoveTransactionAction implements Runnable {
         private final TransactionInfo info;
 
@@ -86,6 +99,15 @@
             throw IOExceptionSupport.create(e);
         }
     }
+    
+    public void trackBack(Command command) {
+        if (trackMessages && command != null && command.isMessage()) {
+            Message message = (Message) command;
+            if (message.getTransactionId()==null) {
+                currentCacheSize+=message.getSize();
+            }
+        }
+    }
 
     public void restore(Transport transport) throws IOException {
         // Restore the connections.
@@ -102,6 +124,10 @@
                 restoreTransactions(transport, connectionState);
             }
         }
+        //now flush messages
+        for (Message msg:messageCache.values()) {
+            transport.oneway(msg);
+        }
     }
 
     private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
@@ -311,18 +337,22 @@
     }
 
     public Response processMessage(Message send) throws Exception {
-        if (trackTransactions && send != null && send.getTransactionId() != null) {
-            ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
-            if (connectionId != null) {
-                ConnectionState cs = connectionStates.get(connectionId);
-                if (cs != null) {
-                    TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
-                    if (transactionState != null) {
-                        transactionState.addCommand(send);
+        if (send != null) {
+            if (trackTransactions && send.getTransactionId() != null) {
+                ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+                if (connectionId != null) {
+                    ConnectionState cs = connectionStates.get(connectionId);
+                    if (cs != null) {
+                        TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+                        if (transactionState != null) {
+                            transactionState.addCommand(send);
+                        }
                     }
                 }
+                return TRACKED_RESPONSE_MARKER;
+            }else if (trackMessages) {
+                messageCache.put(send.getMessageId(), send.copy());
             }
-            return TRACKED_RESPONSE_MARKER;
         }
         return null;
     }
@@ -481,6 +511,22 @@
 
     public void setRestoreTransaction(boolean restoreTransaction) {
         this.restoreTransaction = restoreTransaction;
+    }
+
+    public boolean isTrackMessages() {
+        return trackMessages;
+    }
+
+    public void setTrackMessages(boolean trackMessages) {
+        this.trackMessages = trackMessages;
+    }
+
+    public int getMaxCacheSize() {
+        return maxCacheSize;
+    }
+
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=631244&r1=631243&r2=631244&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Tue Feb 26 07:00:37 2008
@@ -93,7 +93,7 @@
                 future.set(response);
             } else {
                 if (debug) {
-                    LOG.debug("Received unexpected response for command id: " + response.getCorrelationId());
+                    LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
                 }
             }
         } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=631244&r1=631243&r2=631244&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Feb 26 07:00:37 2008
@@ -32,6 +32,7 @@
 
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.state.ConnectionStateTracker;
@@ -92,6 +93,8 @@
     private boolean backup=false;
     private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
     private int backupPoolSize=1;
+    private boolean trackMessages = true;
+    private int maxCacheSize = 128 * 1024;
     
 
     private final TransportListener myTransportListener = createTransportListener();
@@ -223,6 +226,8 @@
                 return;
             }
             started = true;
+            stateTracker.setMaxCacheSize(getMaxCacheSize());
+            stateTracker.setTrackMessages(isTrackMessages());
             if (connectedTransport != null) {
                 stateTracker.restore(connectedTransport);
             } else {
@@ -336,6 +341,22 @@
 		this.backupPoolSize = backupPoolSize;
 	}
 	
+	public boolean isTrackMessages() {
+        return trackMessages;
+    }
+
+    public void setTrackMessages(boolean trackMessages) {
+        this.trackMessages = trackMessages;
+    }
+
+    public int getMaxCacheSize() {
+        return maxCacheSize;
+    }
+
+    public void setMaxCacheSize(int maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+    }
+	
     /**
      * @return Returns true if the command is one sent when a connection
      * is being closed.
@@ -407,6 +428,7 @@
                         // Send the message.
                         try {
                             connectedTransport.oneway(command);
+                            stateTracker.trackBack(command);
                         } catch (IOException e) {
 
                             // If the command was not tracked.. we will retry in
@@ -548,6 +570,10 @@
 
     protected void restoreTransport(Transport t) throws Exception, IOException {
         t.start();
+        //send information to the broker - informing it we are an ft client
+        ConnectionControl cc = new ConnectionControl();
+        cc.setFaultTolerant(true);
+        t.oneway(cc);
         stateTracker.restore(t);
         for (Iterator<Command> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
             Command command = iter2.next();
@@ -753,5 +779,4 @@
     public void reconnect(URI uri) throws IOException {
     	add(new URI[] {uri});
     }
-
 }