You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:16:34 UTC

svn commit: r900385 - in /activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq: ActiveMQMessageConsumer.java broker/region/PrefetchSubscription.java transport/failover/FailoverTransport.java

Author: gtully
Date: Mon Jan 18 13:16:34 2010
New Revision: 900385

URL: http://svn.apache.org/viewvc?rev=900385&view=rev
Log:
merge -c 897988 https://svn.apache.org/repos/asf/activemq/trunk - reduce unmatched ack exceptions, tidy up prefetchExtension https://issues.apache.org/activemq/browse/AMQ-2560

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=900385&r1=900384&r2=900385&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Mon Jan 18 13:16:34 2010
@@ -937,6 +937,9 @@
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
+        synchronized (unconsumedMessages.getMutex()) {
+            clearDispatchListOnReconnect();
+        }
         synchronized(deliveredMessages) {
             // Acknowledge all messages so far.
             MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=900385&r1=900384&r2=900385&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jan 18 13:16:34 2010
@@ -199,7 +199,7 @@
             }
         }
         if (LOG.isTraceEnabled()) {
-            LOG.trace("ack:" + ack);
+            LOG.info("ack:" + ack);
         }
         synchronized(dispatchLock) {
             if (ack.isStandardAck()) {
@@ -256,9 +256,6 @@
                                     prefetchExtension = Math.max(
                                             prefetchExtension, index );
                                 }
-                            } else {
-                                prefetchExtension = Math.max(0,
-                                        prefetchExtension - index);
                             }
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;
@@ -319,8 +316,6 @@
             } else if (ack.isRedeliveredAck()) {
                 // Message was re-delivered but it was not yet considered to be
                 // a DLQ message.
-                // Acknowledge all dispatched messages up till the message id of
-                // the ack.
                 boolean inAckRange = false;
                 for (final MessageReference node : dispatched) {
                     MessageId messageId = node.getMessageId();
@@ -349,9 +344,6 @@
                     throw new JMSException("Poison ack cannot be transacted: "
                             + ack);
                 }
-                // Acknowledge all dispatched messages up till the message id of
-                // the
-                // acknowledgment.
                 int index = 0;
                 boolean inAckRange = false;
                 List<MessageReference> removeList = new ArrayList<MessageReference>();

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=900385&r1=900384&r2=900385&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon Jan 18 13:16:34 2010
@@ -201,26 +201,27 @@
             transport.setTransportListener(disposedListener);
             ServiceSupport.dispose(transport);
             
+            boolean reconnectOk = false;
             synchronized (reconnectMutex) {
-                boolean reconnectOk = false;
                 if(started) {
                     LOG.warn("Transport failed to " + connectedTransportURI+ " , attempting to automatically reconnect due to: " + e);
                     LOG.debug("Transport failed with the following exception:", e);
                     reconnectOk = true;
-                }
-                
+                }          
                 initialized = false;
                 failedConnectTransportURI=connectedTransportURI;
                 connectedTransportURI = null;
                 connected=false;
-                if(reconnectOk) {
+            
+                // notify before any reconnect attempt so ack state can be whacked
+                if (transportListener != null) {
+                    transportListener.transportInterupted();
+                }
+            
+                if (reconnectOk) {
                     reconnectTask.wakeup();
                 }
             }
-
-            if (transportListener != null) {
-                transportListener.transportInterupted();
-            }
         }
 
     }
@@ -412,8 +413,8 @@
                         // Skipping send of ShutdownInfo command when not connected.
                         return;
                     }
-                    if(command instanceof RemoveInfo) {
-                        // Simulate response to RemoveInfo command
+                    if(command instanceof RemoveInfo || command.isMessageAck()) {
+                        // Simulate response to RemoveInfo command or ack (as it will be stale)
                         stateTracker.track(command);
                         Response response = new Response();
                         response.setCorrelationId(command.getCommandId());
@@ -432,7 +433,7 @@
                         while (transport == null && !disposed
                                 && connectionFailure == null
                                 && !Thread.currentThread().isInterrupted()) {
-                            LOG.trace("Waiting for transport to reconnect.");
+                            LOG.trace("Waiting for transport to reconnect..: " + command);
                             long end = System.currentTimeMillis();
                             if (timeout > 0 && (end - start > timeout)) {
                             	timedout = true;
@@ -698,7 +699,7 @@
                             t.setTransportListener(myTransportListener);
                             try {
                                 if (started) { 
-                                        restoreTransport(t);  
+                                    restoreTransport(t);  
                                 }
                                 reconnectDelay = initialReconnectDelay;
                                 failedConnectTransportURI=null;
@@ -856,7 +857,7 @@
 		                       bt.setTransport(t);
 		                       backups.add(bt);
 						   }
-					   }catch(Exception e) {
+					   } catch(Exception e) {
 						   LOG.debug("Failed to build backup ",e);
 					   }
 				   }