You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/09/14 19:30:55 UTC
svn commit: r443423 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq:
broker/ state/ transport/failover/ transport/fanout/
Author: chirino
Date: Thu Sep 14 10:30:54 2006
New Revision: 443423
URL: http://svn.apache.org/viewvc?view=rev&rev=443423
Log:
https://issues.apache.org/activemq/browse/AMQ-915
Added:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java Thu Sep 14 10:30:54 2006
@@ -61,6 +61,7 @@
import org.apache.activemq.state.ConsumerState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
+import org.apache.activemq.state.TransactionState;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -307,7 +308,12 @@
if( cs!=null ) {
context = cs.getContext();
}
- broker.beginTransaction(context, info.getTransactionId());
+
+ // Avoid replaying dup commands
+ if( cs.getTransactionState(info.getTransactionId())==null ) {
+ cs.addTransactionState(info.getTransactionId());
+ broker.beginTransaction(context, info.getTransactionId());
+ }
return null;
}
@@ -324,9 +330,22 @@
if( cs!=null ) {
context = cs.getContext();
}
- int result = broker.prepareTransaction(context, info.getTransactionId());
- IntegerResponse response = new IntegerResponse(result);
- return response;
+
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if( transactionState == null )
+ throw new IllegalStateException("Cannot prepare a transaction that had not been started: "+info.getTransactionId());
+
+ // Avoid dups.
+ if( !transactionState.isPrepared() ) {
+ transactionState.setPrepared(true);
+ int result = broker.prepareTransaction(context, info.getTransactionId());
+ transactionState.setPreparedResult(result);
+ IntegerResponse response = new IntegerResponse(result);
+ return response;
+ } else {
+ IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult());
+ return response;
+ }
}
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
@@ -335,8 +354,12 @@
if( cs!=null ) {
context = cs.getContext();
}
+
+ cs.removeTransactionState(info.getTransactionId());
broker.commitTransaction(context, info.getTransactionId(), true);
+
return null;
+
}
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
@@ -345,7 +368,9 @@
if( cs!=null ) {
context = cs.getContext();
}
- broker.commitTransaction(context, info.getTransactionId(), false);
+
+ cs.removeTransactionState(info.getTransactionId());
+ broker.commitTransaction(context, info.getTransactionId(), false);
return null;
}
@@ -355,7 +380,9 @@
if( cs!=null ) {
context = cs.getContext();
}
- broker.rollbackTransaction(context, info.getTransactionId());
+
+ cs.removeTransactionState(info.getTransactionId());
+ broker.rollbackTransaction(context, info.getTransactionId());
return null;
}
@@ -381,10 +408,32 @@
public Response processMessage(Message messageSend) throws Exception {
+
ProducerId producerId = messageSend.getProducerId();
ConnectionState state = lookupConnectionState(producerId);
ConnectionContext context = state.getContext();
- broker.send(context, messageSend);
+
+ // If the message originates from this client connection,
+ // then, finde the associated producer state so we can do some dup detection.
+ ProducerState producerState=null;
+ if( messageSend.getMessageId().getProducerId().equals( messageSend.getProducerId() ) ) {
+ SessionState ss = state.getSessionState(producerId.getParentId());
+ if( ss == null )
+ throw new IllegalStateException("Cannot send from a session that had not been registered: "+producerId.getParentId());
+ producerState = ss.getProducerState(producerId);
+ }
+
+ if( producerState == null ) {
+ broker.send(context, messageSend);
+ } else {
+ // Avoid Dups.
+ long seq = messageSend.getMessageId().getProducerSequenceId();
+ if( seq > producerState.getLastSequenceId() ) {
+ producerState.setLastSequenceId(seq);
+ broker.send(context, messageSend);
+ }
+ }
+
return null;
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Thu Sep 14 10:30:54 2006
@@ -30,6 +30,7 @@
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +38,7 @@
public class ConnectionState {
final ConnectionInfo info;
+ private final ConcurrentHashMap transactions = new ConcurrentHashMap();
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new ArrayList());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -63,6 +65,20 @@
iter.remove();
}
}
+ }
+
+ public void addTransactionState(TransactionId id) {
+ checkShutdown();
+ transactions.put(id, new TransactionState(id));
+ }
+ public TransactionState getTransactionState(TransactionId id) {
+ return (TransactionState)transactions.get(id);
+ }
+ public Collection getTransactionStates() {
+ return transactions.values();
+ }
+ public TransactionState removeTransactionState(TransactionId id) {
+ return (TransactionState) transactions.remove(id);
}
public void addSession(SessionInfo info) {
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Thu Sep 14 10:30:54 2006
@@ -54,21 +54,39 @@
*/
public class ConnectionStateTracker implements CommandVisitor {
- private final static Response TRACKED_RESPONSE_MARKER = new Response();
+ private final static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
- boolean trackTransactions = false;
- boolean trackMessages = false;
- boolean trackAcks = false;
+ private boolean trackTransactions = false;
private boolean restoreSessions=true;
- boolean restoreConsumers=true;
+ private boolean restoreConsumers=true;
private boolean restoreProducers=true;
+ private boolean restoreTransaction=true;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
-
- public boolean track(Command command) throws IOException {
+
+ private class RemoveTransactionAction implements Runnable {
+ private final TransactionInfo info;
+ public RemoveTransactionAction(TransactionInfo info) {
+ this.info = info;
+ }
+ public void run() {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ cs.removeTransactionState(info.getTransactionId());
+ }
+ }
+
+ /**
+ *
+ *
+ * @param command
+ * @return null if the command is not state tracked.
+ * @throws IOException
+ */
+ public Tracked track(Command command) throws IOException {
try {
- return command.visit(this)!=null;
+ return (Tracked) command.visit(this);
} catch (IOException e) {
throw e;
} catch (Throwable e) {
@@ -85,10 +103,23 @@
if( restoreSessions )
restoreSessions(transport, connectionState);
+
+ if( restoreTransaction )
+ restoreTransactions(transport, connectionState);
}
}
- /**
+ private void restoreTransactions(Transport transport, ConnectionState connectionState) throws IOException {
+ for (Iterator iter = connectionState.getTransactionStates().iterator(); iter.hasNext();) {
+ TransactionState transactionState = (TransactionState) iter.next();
+ for (Iterator iterator = transactionState.getCommands().iterator(); iterator.hasNext();) {
+ Command command = (Command) iterator.next();
+ transport.oneway(command);
+ }
+ }
+ }
+
+ /**
* @param transport
* @param connectionState
* @throws IOException
@@ -226,48 +257,113 @@
return null;
}
public Response processMessage(Message send) throws Exception {
- return null;
- }
+ if( trackTransactions && send.getTransactionId() != null ) {
+ ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
+ transactionState.addCommand(send);
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
public Response processMessageAck(MessageAck ack) throws Exception {
- return null;
+ if( trackTransactions && ack.getTransactionId() != null ) {
+ ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
+ transactionState.addCommand(ack);
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
}
+
public Response processBeginTransaction(TransactionInfo info) throws Exception {
- return null;
- }
+ if( trackTransactions ) {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ cs.addTransactionState(info.getTransactionId());
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
+ }
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
- return null;
+ if( trackTransactions ) {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ transactionState.addCommand(info);
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
}
+
public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception {
- return null;
- }
+ if( trackTransactions ) {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if( transactionState !=null ) {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info));
+ }
+ }
+ return null;
+ }
public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception {
- return null;
+ if( trackTransactions ) {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if( transactionState !=null ) {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info));
+ }
+ }
+ return null;
}
+
public Response processRollbackTransaction(TransactionInfo info) throws Exception {
- return null;
+ if( trackTransactions ) {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ if( transactionState !=null ) {
+ transactionState.addCommand(info);
+ return new Tracked(new RemoveTransactionAction(info));
+ }
+ }
+ return null;
}
- public Response processWireFormat(WireFormatInfo info) throws Exception {
- return null;
+
+ public Response processEndTransaction(TransactionInfo info) throws Exception {
+ if( trackTransactions ) {
+ ConnectionId connectionId = info.getConnectionId();
+ ConnectionState cs = (ConnectionState) connectionStates.get(connectionId);
+ TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
+ transactionState.addCommand(info);
+ return TRACKED_RESPONSE_MARKER;
+ }
+ return null;
}
- public Response processKeepAlive(KeepAliveInfo info) throws Exception {
+
+ public Response processRecoverTransactions(TransactionInfo info) {
return null;
}
- public Response processShutdown(ShutdownInfo info) throws Exception {
+ public Response processForgetTransaction(TransactionInfo info) throws Exception {
return null;
}
- public Response processBrokerInfo(BrokerInfo info) throws Exception {
+
+
+ public Response processWireFormat(WireFormatInfo info) throws Exception {
return null;
}
-
- public Response processRecoverTransactions(TransactionInfo info) {
+ public Response processKeepAlive(KeepAliveInfo info) throws Exception {
return null;
}
-
- public Response processForgetTransaction(TransactionInfo info) throws Exception {
+ public Response processShutdown(ShutdownInfo info) throws Exception {
return null;
}
-
- public Response processEndTransaction(TransactionInfo info) throws Exception {
+ public Response processBrokerInfo(BrokerInfo info) throws Exception {
return null;
}
@@ -302,4 +398,20 @@
public void setRestoreSessions(boolean restoreSessions) {
this.restoreSessions = restoreSessions;
}
+
+ public boolean isTrackTransactions() {
+ return trackTransactions;
+ }
+
+ public void setTrackTransactions(boolean trackTransactions) {
+ this.trackTransactions = trackTransactions;
+ }
+
+ public boolean isRestoreTransaction() {
+ return restoreTransaction;
+ }
+
+ public void setRestoreTransaction(boolean restoreTransaction) {
+ this.restoreTransaction = restoreTransaction;
+ }
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ProducerState.java Thu Sep 14 10:30:54 2006
@@ -21,7 +21,8 @@
import org.apache.activemq.command.ProducerInfo;
public class ProducerState {
- final ProducerInfo info;
+ final ProducerInfo info;
+ private long lastSequenceId=-1;
public ProducerState(ProducerInfo info) {
this.info = info;
@@ -31,5 +32,11 @@
}
public ProducerInfo getInfo() {
return info;
- }
+ }
+ public void setLastSequenceId(long lastSequenceId) {
+ this.lastSequenceId = lastSequenceId;
+ }
+ public long getLastSequenceId() {
+ return lastSequenceId;
+ }
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java Thu Sep 14 10:30:54 2006
@@ -69,11 +69,13 @@
}
public Set getProducerIds() {
return producers.keySet();
- }
-
+ }
public Collection getProducerStates() {
return producers.values();
}
+ public ProducerState getProducerState(ProducerId producerId) {
+ return (ProducerState) producers.get(producerId);
+ }
public Collection getConsumerStates() {
return consumers.values();
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java?view=auto&rev=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/Tracked.java Thu Sep 14 10:30:54 2006
@@ -0,0 +1,27 @@
+/**
+ *
+ */
+package org.apache.activemq.state;
+
+import org.apache.activemq.command.Response;
+
+public class Tracked extends Response {
+
+ private Runnable runnable;
+
+ public Tracked(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ public void onResponses() {
+ if( runnable != null ) {
+ runnable.run();
+ runnable=null;
+ }
+ }
+
+ public boolean isWaitingForResponse() {
+ return runnable!=null;
+ }
+
+}
\ No newline at end of file
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java?view=auto&rev=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/TransactionState.java Thu Sep 14 10:30:54 2006
@@ -0,0 +1,79 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.state;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.TransactionId;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+public class TransactionState {
+ final TransactionId id;
+
+ public final ArrayList commands = new ArrayList();
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ private boolean prepared;
+
+ private int preparedResult;
+
+ public TransactionState(TransactionId id) {
+ this.id = id;
+ }
+ public String toString() {
+ return id.toString();
+ }
+
+ public void addCommand(Command operation) {
+ checkShutdown();
+ commands.add(operation);
+ }
+
+ public List getCommands() {
+ return commands;
+ }
+
+ private void checkShutdown() {
+ if( shutdown.get() )
+ throw new IllegalStateException("Disposed");
+ }
+
+ public void shutdown() {
+ shutdown.set(false);
+ }
+ public TransactionId getId() {
+ return id;
+ }
+
+ public void setPrepared(boolean prepared) {
+ this.prepared = prepared;
+ }
+ public boolean isPrepared() {
+ return prepared;
+ }
+ public void setPreparedResult(int preparedResult) {
+ this.preparedResult = preparedResult;
+ }
+ public int getPreparedResult() {
+ return preparedResult;
+ }
+
+}
\ No newline at end of file
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 14 10:30:54 2006
@@ -28,6 +28,7 @@
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.state.ConnectionStateTracker;
+import org.apache.activemq.state.Tracked;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
@@ -86,7 +87,10 @@
return;
}
if (command.isResponse()) {
- requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+ Object object = requestMap.remove(new Integer(((Response) command).getCorrelationId()));
+ if( object!=null && object.getClass() == Tracked.class ) {
+ ((Tracked)object).onResponses();
+ }
}
if (!initialized){
if (command.isBrokerInfo()){
@@ -132,6 +136,8 @@
public FailoverTransport() throws InterruptedIOException {
+ stateTracker.setTrackTransactions(true);
+
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
@@ -368,7 +374,10 @@
// the state tracker,
// then hold it in the requestMap so that we can replay
// it later.
- if (!stateTracker.track(command) && command.isResponseRequired()) {
+ Tracked tracked = stateTracker.track(command);
+ if( tracked!=null && tracked.isWaitingForResponse() ) {
+ requestMap.put(new Integer(command.getCommandId()), tracked);
+ } else if ( tracked==null && command.isResponseRequired()) {
requestMap.put(new Integer(command.getCommandId()), command);
}
@@ -376,13 +385,20 @@
try {
connectedTransport.oneway(command);
} catch (IOException e) {
- // If there is an IOException in the send, remove the command from the requestMap
- if (!stateTracker.track(command) && command.isResponseRequired()) {
- requestMap.remove(new Integer(command.getCommandId()), command);
- }
-
- // Rethrow the exception so it will handled by the outer catch
- throw e;
+
+ // If the command was not tracked.. we will retry in this method
+ if( tracked==null ) {
+
+ // since we will retry in this method.. take it out of the request
+ // map so that it is not sent 2 times on recovery
+ if( command.isResponseRequired() ) {
+ requestMap.remove(new Integer(command.getCommandId()));
+ }
+
+ // Rethrow the exception so it will handled by the outer catch
+ throw e;
+ }
+
}
return;
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?view=diff&rev=443423&r1=443422&r2=443423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Sep 14 10:30:54 2006
@@ -340,7 +340,7 @@
// then hold it in the requestMap so that we can replay
// it later.
boolean fanout = isFanoutCommand(command);
- if (!stateTracker.track(command) && command.isResponseRequired() ) {
+ if (stateTracker.track(command)==null && command.isResponseRequired() ) {
int size = fanout ? minAckCount : 1;
requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
}