You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/02/26 16:00:38 UTC
svn commit: r631244 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
state/ConnectionStateTracker.java transport/ResponseCorrelator.java
transport/failover/FailoverTransport.java
Author: rajdavies
Date: Tue Feb 26 07:00:37 2008
New Revision: 631244
URL: http://svn.apache.org/viewvc?rev=631244&view=rev
Log:
optionally replay messages on the fault tolerant transport
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=631244&r1=631243&r2=631244&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Tue Feb 26 07:00:37 2008
@@ -18,6 +18,8 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.command.Command;
@@ -28,6 +30,7 @@
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
@@ -48,14 +51,24 @@
private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
-
+
private boolean trackTransactions;
private boolean restoreSessions = true;
private boolean restoreConsumers = true;
private boolean restoreProducers = true;
private boolean restoreTransaction = true;
-
-
+ private boolean trackMessages = true;
+ private int maxCacheSize = 128 * 1024;
+ private int currentCacheSize;
+ private Map<MessageId,Message> messageCache = new LinkedHashMap<MessageId,Message>(){
+ protected boolean removeEldestEntry(Map.Entry<MessageId,Message> eldest) {
+ boolean result = currentCacheSize > maxCacheSize;
+ currentCacheSize -= eldest.getValue().getSize();
+ return result;
+ }
+ };
+
+
private class RemoveTransactionAction implements Runnable {
private final TransactionInfo info;
@@ -86,6 +99,15 @@
throw IOExceptionSupport.create(e);
}
}
+
+ public void trackBack(Command command) {
+ if (trackMessages && command != null && command.isMessage()) {
+ Message message = (Message) command;
+ if (message.getTransactionId()==null) {
+ currentCacheSize+=message.getSize();
+ }
+ }
+ }
public void restore(Transport transport) throws IOException {
// Restore the connections.
@@ -102,6 +124,10 @@
restoreTransactions(transport, connectionState);
}
}
+ //now flush messages
+ for (Message msg:messageCache.values()) {
+ transport.oneway(msg);
+ }
}
private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
@@ -311,18 +337,22 @@
}
public Response processMessage(Message send) throws Exception {
- if (trackTransactions && send != null && send.getTransactionId() != null) {
- ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
- if (connectionId != null) {
- ConnectionState cs = connectionStates.get(connectionId);
- if (cs != null) {
- TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
- if (transactionState != null) {
- transactionState.addCommand(send);
+ if (send != null) {
+ if (trackTransactions && send.getTransactionId() != null) {
+ ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+ if (connectionId != null) {
+ ConnectionState cs = connectionStates.get(connectionId);
+ if (cs != null) {
+ TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+ if (transactionState != null) {
+ transactionState.addCommand(send);
+ }
}
}
+ return TRACKED_RESPONSE_MARKER;
+ }else if (trackMessages) {
+ messageCache.put(send.getMessageId(), send.copy());
}
- return TRACKED_RESPONSE_MARKER;
}
return null;
}
@@ -481,6 +511,22 @@
public void setRestoreTransaction(boolean restoreTransaction) {
this.restoreTransaction = restoreTransaction;
+ }
+
+ public boolean isTrackMessages() {
+ return trackMessages;
+ }
+
+ public void setTrackMessages(boolean trackMessages) {
+ this.trackMessages = trackMessages;
+ }
+
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+
+ public void setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java?rev=631244&r1=631243&r2=631244&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ResponseCorrelator.java Tue Feb 26 07:00:37 2008
@@ -93,7 +93,7 @@
future.set(response);
} else {
if (debug) {
- LOG.debug("Received unexpected response for command id: " + response.getCorrelationId());
+ LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
}
}
} else {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=631244&r1=631243&r2=631244&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Feb 26 07:00:37 2008
@@ -32,6 +32,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
@@ -92,6 +93,8 @@
private boolean backup=false;
private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
private int backupPoolSize=1;
+ private boolean trackMessages = true;
+ private int maxCacheSize = 128 * 1024;
private final TransportListener myTransportListener = createTransportListener();
@@ -223,6 +226,8 @@
return;
}
started = true;
+ stateTracker.setMaxCacheSize(getMaxCacheSize());
+ stateTracker.setTrackMessages(isTrackMessages());
if (connectedTransport != null) {
stateTracker.restore(connectedTransport);
} else {
@@ -336,6 +341,22 @@
this.backupPoolSize = backupPoolSize;
}
+ public boolean isTrackMessages() {
+ return trackMessages;
+ }
+
+ public void setTrackMessages(boolean trackMessages) {
+ this.trackMessages = trackMessages;
+ }
+
+ public int getMaxCacheSize() {
+ return maxCacheSize;
+ }
+
+ public void setMaxCacheSize(int maxCacheSize) {
+ this.maxCacheSize = maxCacheSize;
+ }
+
/**
* @return Returns true if the command is one sent when a connection
* is being closed.
@@ -407,6 +428,7 @@
// Send the message.
try {
connectedTransport.oneway(command);
+ stateTracker.trackBack(command);
} catch (IOException e) {
// If the command was not tracked.. we will retry in
@@ -548,6 +570,10 @@
protected void restoreTransport(Transport t) throws Exception, IOException {
t.start();
+ //send information to the broker - informing it we are an ft client
+ ConnectionControl cc = new ConnectionControl();
+ cc.setFaultTolerant(true);
+ t.oneway(cc);
stateTracker.restore(t);
for (Iterator<Command> iter2 = requestMap.values().iterator(); iter2.hasNext();) {
Command command = iter2.next();
@@ -753,5 +779,4 @@
public void reconnect(URI uri) throws IOException {
add(new URI[] {uri});
}
-
}