You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/14 19:38:33 UTC
svn commit: r443425 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
broker/ state/ transport/failover/ transport/fanout/
Author: chirino
Date: Thu Sep 14 10:38:32 2006
New Revision: 443425
URL: http://svn.apache.org/viewvc?view=rev&rev=443425
Log:
Rolling back commit 443271 since it is breaking the build
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Thu Sep 14 10:38:32 2006
@@ -62,7 +62,6 @@
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
-import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -309,12 +308,7 @@
if( cs!=null ) {
context = cs.getContext();
}
-
- // Avoid replaying dup commands
- if( cs.getTransactionState(info.getTransactionId())==null ) {
- cs.addTransactionState(info.getTransactionId());
- broker.beginTransaction(context, info.getTransactionId());
- }
+ broker.beginTransaction(context, info.getTransactionId());
return null;
}
@@ -331,22 +325,9 @@
if( cs!=null ) {
context = cs.getContext();
}
-
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- if( transactionState == null )
- throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
-
- // Avoid dups.
- if( !transactionState.isPrepared() ) {
- transactionState.setPrepared(true);
- int result = broker.prepareTransaction(context, info.getTransactionId());
- transactionState.setPreparedResult(result);
- IntegerResponse response = new IntegerResponse(result);
- return response;
- } else {
- IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
- return response;
- }
+ int result = broker.prepareTransaction(context, info.getTransactionId());
+ IntegerResponse response = new IntegerResponse(result);
+ return response;
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@@ -355,12 +336,8 @@
if( cs!=null ) {
context = cs.getContext();
}
-
- cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), true);
-
return null;
-
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@@ -369,8 +346,6 @@
if( cs!=null ) {
context = cs.getContext();
}
-
- cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), false);
return null;
}
@@ -381,8 +356,6 @@
if( cs!=null ) {
context = cs.getContext();
}
-
- cs.removeTransactionState(info.getTransactionId());
broker.rollbackTransaction(context, info.getTransactionId());
return null;
}
@@ -409,32 +382,10 @@
public Response processMessage(Message messageSend) throws Exception {
-
ProducerId producerId = messageSend.getProducerId();
ConnectionState state = lookupConnectionState(producerId);
ConnectionContext context = state.getContext();
-
- // If the message originates from this client connection,
- // then, finde the associated producer state so we can do some dup detection.
- ProducerState producerState=null;
- if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
- SessionState ss = state.getSessionState(producerId.getParentId());
- if( ss == null )
- throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
- producerState = ss.getProducerState(producerId);
- }
-
- if( producerState == null ) {
- broker.send(context, messageSend);
- } else {
- // Avoid Dups.
- long seq = messageSend.getMessageId().getProducerSequenceId();
- if( seq > producerState.getLastSequenceId() ) {
- producerState.setLastSequenceId(seq);
- broker.send(context, messageSend);
- }
- }
-
+ broker.send(context, messageSend);
return null;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Thu Sep 14 10:38:32 2006
@@ -30,7 +30,6 @@
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.TransactionId;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -38,7 +37,6 @@
public class ConnectionState {
final ConnectionInfo info;
- private final ConcurrentHashMap transactions = new ConcurrentHashMap();
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -65,20 +63,6 @@
iter.remove();
}
}
- }
-
- public void addTransactionState(TransactionId id) {
- checkShutdown();
- transactions.put(id, new TransactionState(id));
- }
- public TransactionState getTransactionState(TransactionId id) {
- return (TransactionState)transactions.get(id);
- }
- public Collection getTransactionStates() {
- return transactions.values();
- }
- public TransactionState removeTransactionState(TransactionId id) {
- return (TransactionState) transactions.remove(id);
}
public void addSession(SessionInfo info) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Sep 14 10:38:32 2006
@@ -55,41 +55,21 @@
*/
public class ConnectionStateTracker implements CommandVisitor {
- private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+ private final static Response TRACKED_RESPONSE_MARKER = new Response();
- private boolean trackTransactions = false;
+ boolean trackTransactions = false;
+ boolean trackMessages = false;
+ boolean trackAcks = false;
private boolean restoreSessions=true;
- private boolean restoreConsumers=true;
+ boolean restoreConsumers=true;
private boolean restoreProducers=true;
- private boolean restoreTransaction=true;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
-
- private class RemoveTransactionAction implements Runnable {
- private final TransactionInfo info;
- public RemoveTransactionAction(TransactionInfo info) {
- this.info = info;
- }
- public void run() {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
- if( cs != null ) {
- cs.removeTransactionState(info.getTransactionId());
- }
- }
- }
-
- /**
- *
- *
- * @param command
- * @return null if the command is not state tracked.
- * @throws IOException
- */
- public Tracked track(Command command) throws IOException {
+
+ public boolean track(Command command) throws IOException {
try {
- return (Tracked) command.visit(this);
+ return command.visit(this)!=null;
} catch (IOException e) {
throw e;
} catch (Throwable e) {
@@ -106,22 +86,9 @@
if( restoreSessions )
restoreSessions(transport, connectionState);
-
- if( restoreTransaction )
- restoreTransactions(transport, connectionState);
}
}
- private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
- for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
- TransactionState transactionState = (TransactionState) iter.next();
- for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
- Command command = (Command) iterator.next();
- transport.oneway(command);
- }
- }
- }
-
/**
* @param transport
* @param connectionState
@@ -260,103 +227,26 @@
return null;
}
public Response processMessage(Message send) throws Exception {
- if( trackTransactions && send.getTransactionId() != null ) {
- ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
- transactionState.addCommand(send);
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
- }
+ return null;
+ }
public Response processMessageAck(MessageAck ack) throws Exception {
- if( trackTransactions && ack.getTransactionId() != null ) {
- ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
- transactionState.addCommand(ack);
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
+ return null;
}
-
public Response processBeginTransaction(TransactionInfo info) throws Exception {
- if( trackTransactions ) {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- cs.addTransactionState(info.getTransactionId());
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
- }
+ return null;
+ }
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- if( trackTransactions ) {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- transactionState.addCommand(info);
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
+ return null;
}
-
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- if( trackTransactions ) {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- if( transactionState !=null ) {
- transactionState.addCommand(info);
- return new Tracked(new RemoveTransactionAction(info));
- }
- }
- return null;
- }
- public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- if( trackTransactions ) {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- if( transactionState !=null ) {
- transactionState.addCommand(info);
- return new Tracked(new RemoveTransactionAction(info));
- }
- }
- return null;
- }
-
- public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- if( trackTransactions ) {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- if( transactionState !=null ) {
- transactionState.addCommand(info);
- return new Tracked(new RemoveTransactionAction(info));
- }
- }
- return null;
- }
-
- public Response processEndTransaction(TransactionInfo info) throws Exception {
- if( trackTransactions ) {
- ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
- TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- transactionState.addCommand(info);
- return TRACKED_RESPONSE_MARKER;
- }
- return null;
+ return null;
}
-
- public Response processRecoverTransactions(TransactionInfo info) {
+ public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
return null;
}
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ public Response processRollbackTransaction(TransactionInfo info) throws Exception {
return null;
}
-
-
public Response processWireFormat(WireFormatInfo info) throws Exception {
return null;
}
@@ -370,6 +260,18 @@
return null;
}
+ public Response processRecoverTransactions(TransactionInfo info) {
+ return null;
+ }
+
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ return null;
+ }
+
public Response processFlush(FlushCommand command) throws Exception {
return null;
}
@@ -405,20 +307,5 @@
public void setRestoreSessions(boolean restoreSessions) {
this.restoreSessions = restoreSessions;
}
-
- public boolean isTrackTransactions() {
- return trackTransactions;
- }
-
- public void setTrackTransactions(boolean trackTransactions) {
- this.trackTransactions = trackTransactions;
- }
-
- public boolean isRestoreTransaction() {
- return restoreTransaction;
- }
-
- public void setRestoreTransaction(boolean restoreTransaction) {
- this.restoreTransaction = restoreTransaction;
- }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java Thu Sep 14 10:38:32 2006
@@ -22,7 +22,6 @@
public class ProducerState {
final ProducerInfo info;
- private long lastSequenceId=-1;
public ProducerState(ProducerInfo info) {
this.info = info;
@@ -32,11 +31,5 @@
}
public ProducerInfo getInfo() {
return info;
- }
- public void setLastSequenceId(long lastSequenceId) {
- this.lastSequenceId = lastSequenceId;
- }
- public long getLastSequenceId() {
- return lastSequenceId;
- }
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Thu Sep 14 10:38:32 2006
@@ -74,9 +74,6 @@
public Collection getProducerStates() {
return producers.values();
}
- public ProducerState getProducerState(ProducerId producerId) {
- return (ProducerState) producers.get(producerId);
- }
public Collection getConsumerStates() {
return consumers.values();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 14 10:38:32 2006
@@ -28,7 +28,6 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
-import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@@ -90,10 +89,7 @@
return;
}
if (command.isResponse()) {
- Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
- if( object!=null && object.getClass() == Tracked.class ) {
- ((Tracked)object).onResponses();
- }
+ requestMap.remove(new Integer(((Response) command).getCorrelationId()));
}
if (!initialized){
if (command.isBrokerInfo()){
@@ -140,8 +136,6 @@
public FailoverTransport() throws InterruptedIOException {
- stateTracker.setTrackTransactions(true);
-
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@@ -378,10 +372,7 @@
// the state tracker,
// then hold it in the requestMap so that we can replay
// it later.
- Tracked tracked = stateTracker.track(command);
- if( tracked!=null && tracked.isWaitingForResponse() ) {
- requestMap.put(new Integer(command.getCommandId()), tracked);
- } else if ( tracked==null && command.isResponseRequired()) {
+ if (!stateTracker.track(command) && command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), command);
}
@@ -389,19 +380,13 @@
try {
connectedTransport.oneway(command);
} catch (IOException e) {
-
- // If the command was not tracked.. we will retry in this method
- if( tracked==null ) {
-
- // since we will retry in this method.. take it out of the request
- // map so that it is not sent 2 times on recovery
- if( command.isResponseRequired() ) {
- requestMap.remove(new Integer(command.getCommandId()));
- }
-
- // Rethrow the exception so it will handled by the outer catch
- throw e;
- }
+ // If there is an IOException in the send, remove the command from the requestMap
+ if (!stateTracker.track(command) && command.isResponseRequired()) {
+ requestMap.remove(new Integer(command.getCommandId()), command);
+ }
+
+ // Rethrow the exception so it will handled by the outer catch
+ throw e;
}
return;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=443425&r1=443424&r2=443425
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Sep 14 10:38:32 2006
@@ -340,7 +340,7 @@
// then hold it in the requestMap so that we can replay
// it later.
boolean fanout = isFanoutCommand(command);
- if (stateTracker.track(command)==null && command.isResponseRequired() ) {
+ if (!stateTracker.track(command) && command.isResponseRequired() ) {
int size = fanout ? minAckCount : 1;
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}