You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2007/10/19 05:54:46 UTC

svn commit: r586251 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java

Author: jlim
Date: Thu Oct 18 20:54:45 2007
New Revision: 586251

URL: http://svn.apache.org/viewvc?rev=586251&view=rev
Log:
applied patch for AMQ-1440 and AMQ-1439

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java?rev=586251&r1=586250&r2=586251&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java Thu Oct 18 20:54:45 2007
@@ -301,6 +301,14 @@
         reconnectTask.shutdown();
     }
 
+	public int getMinAckCount() {
+		return minAckCount;
+	}
+
+	public void setMinAckCount(int minAckCount) {
+		this.minAckCount = minAckCount;
+	}    
+    
     public long getInitialReconnectDelay() {
         return initialReconnectDelay;
     }
@@ -338,24 +346,14 @@
         try {
             synchronized (reconnectMutex) {
 
-                // If it was a request and it was not being tracked by
-                // the state tracker,
-                // then hold it in the requestMap so that we can replay
-                // it later.
-                boolean fanout = isFanoutCommand(command);
-                if (stateTracker.track(command) == null && command.isResponseRequired()) {
-                    int size = fanout ? minAckCount : 1;
-                    requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
-                }
-
                 // Wait for transport to be connected.
-                while (connectedCount != minAckCount && !disposed && connectionFailure == null) {
+                while (connectedCount < minAckCount && !disposed && connectionFailure == null) {
                     LOG.debug("Waiting for at least " + minAckCount + " transports to be connected.");
                     reconnectMutex.wait(1000);
                 }
 
                 // Still not fully connected.
-                if (connectedCount != minAckCount) {
+                if (connectedCount < minAckCount) {
 
                     Exception error;
 
@@ -374,6 +372,16 @@
                     throw IOExceptionSupport.create(error);
                 }
 
+                // If it was a request and it was not being tracked by
+                // the state tracker,
+                // then hold it in the requestMap so that we can replay
+                // it later.
+                boolean fanout = isFanoutCommand(command);
+                if (stateTracker.track(command) == null && command.isResponseRequired()) {
+                    int size = fanout ? minAckCount : 1;
+                    requestMap.put(new Integer(command.getCommandId()), new RequestCounter(command, size));
+                }
+                
                 // Send the message.
                 if (fanout) {
                     for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
@@ -543,4 +551,5 @@
     public boolean isFaultTolerant() {
         return true;
     }
+
 }