You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2007/10/19 05:54:46 UTC
svn commit: r586251 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
Author: jlim
Date: Thu Oct 18 20:54:45 2007
New Revision: 586251
URL: http://svn.apache.org/viewvc?rev=586251&view=rev
Log:
applied patch for AMQ-1440 and AMQ-1439
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=586251&r1=586250&r2=586251&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Oct 18 20:54:45 2007
@@ -301,6 +301,14 @@
reconnectTask.shutdown();
}
+ public int getMinAckCount() {
+ return minAckCount;
+ }
+
+ public void setMinAckCount(int minAckCount) {
+ this.minAckCount = minAckCount;
+ }
+
public long getInitialReconnectDelay() {
return initialReconnectDelay;
}
@@ -338,24 +346,14 @@
try {
synchronized (reconnectMutex) {
- // If it was a request and it was not being tracked by
- // the state tracker,
- // then hold it in the requestMap so that we can replay
- // it later.
- boolean fanout = isFanoutCommand(command);
- if (stateTracker.track(command) == null && command.isResponseRequired()) {
- int size = fanout ? minAckCount : 1;
- requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
- }
-
// Wait for transport to be connected.
- while (connectedCount != minAckCount && !disposed && connectionFailure == null) {
+ while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
reconnectMutex.wait(1000);
}
// Still not fully connected.
- if (connectedCount != minAckCount) {
+ if (connectedCount < minAckCount) {
Exception error;
@@ -374,6 +372,16 @@
throw IOExceptionSupport.create(error);
}
+ // If it was a request and it was not being tracked by
+ // the state tracker,
+ // then hold it in the requestMap so that we can replay
+ // it later.
+ boolean fanout = isFanoutCommand(command);
+ if (stateTracker.track(command) == null && command.isResponseRequired()) {
+ int size = fanout ? minAckCount : 1;
+ requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
+ }
+
// Send the message.
if (fanout) {
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
@@ -543,4 +551,5 @@
public boolean isFaultTolerant() {
return true;
}
+
}