You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/06 22:49:00 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5340

Repository: activemq
Updated Branches:
  refs/heads/master 3ba28f622 -> c38a61d7a


https://issues.apache.org/jira/browse/AMQ-5340

Clean up a bit, extend test timeout to account for slow CI machines,
remove System prints and replace with LOG.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c38a61d7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c38a61d7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c38a61d7

Branch: refs/heads/master
Commit: c38a61d7aeec46847c0a658d7e39ac85181d5251
Parents: 3ba28f6
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 6 16:47:58 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 6 16:47:58 2015 -0400

----------------------------------------------------------------------
 .../broker/region/PrefetchSubscription.java     |  6 +-
 .../activemq/ActiveMQMessageConsumer.java       | 90 ++++++++------------
 .../activemq/JmsQueueBrowserExpirationTest.java | 28 +++---
 3 files changed, 48 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c38a61d7/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 7d91ba6..2a63c33 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -648,17 +648,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         synchronized(dispatchLock) {
                             pending.remove();
                             node.decrementReferenceCount();
-                            if( !isDropped(node) && canDispatch(node)) {
+                            if (!isDropped(node) && canDispatch(node)) {
 
                                 // Message may have been sitting in the pending
                                 // list a while waiting for the consumer to ak the message.
-                                if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
+                                if (node != QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
                                     //increment number to dispatch
                                     numberToDispatch++;
                                     if (broker.isExpired(node)) {
                                         ((Destination)node.getRegionDestination()).messageExpired(context, this, node);
                                     }
-                                    //AMQ-5340
+
                                     if (!isBrowser()) {
                                         continue;
                                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/c38a61d7/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index d53f7b6..82a1bb4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -492,20 +492,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                     }
                 } else if (md.getMessage() == null) {
                     return null;
-                //AMQ-5340 - only check for expired if not a browser
-                } else if (!isBrowser() && isConsumerExpiryCheckEnabled() && md.getMessage().isExpired()) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(getConsumerId() + " received expired message: " + md);
-                    }
+                } else if (consumeExpiredMessage(md)) {
+                    LOG.debug("{} received expired message: {}", getConsumerId(), md);
                     beforeMessageIsConsumed(md);
                     afterMessageIsConsumed(md, true);
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
                 } else if (redeliveryExceeded(md)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(getConsumerId() + " received with excessive redelivered: " + md);
-                    }
+                    LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
                     posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                 } else {
                     if (LOG.isTraceEnabled()) {
@@ -520,6 +515,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         }
     }
 
+    private boolean consumeExpiredMessage(MessageDispatch dispatch) {
+        if (dispatch.getMessage().isExpired()) {
+            return !isBrowser() && isConsumerExpiryCheckEnabled();
+        }
+
+        return false;
+    }
+
     private void posionAck(MessageDispatch md, String cause) throws JMSException {
         MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
         posionAck.setFirstMessageId(md.getMessage().getMessageId());
@@ -721,9 +724,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         boolean interrupted = Thread.interrupted();
         dispose();
         RemoveInfo removeCommand = info.createRemoveCommand();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("remove: " + this.getConsumerId() + ", lastDeliveredSequenceId:" + lastDeliveredSequenceId);
-        }
+        LOG.debug("remove: {}, lastDeliveredSequenceId: {}", getConsumerId(), lastDeliveredSequenceId);
         removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
         this.session.asyncSendPacket(removeCommand);
         if (interrupted) {
@@ -741,9 +742,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         if (inProgressClearRequiredFlag.get() > 0) {
             synchronized (unconsumedMessages.getMutex()) {
                 if (inProgressClearRequiredFlag.get() > 0) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(getConsumerId() + " clearing unconsumed list (" + unconsumedMessages.size() + ") on transport interrupt");
-                    }
+                    LOG.debug("{} clearing unconsumed list ({}) on transport interrupt", getConsumerId(), unconsumedMessages.size());
                     // ensure unconsumed are rolledback up front as they may get redelivered to another consumer
                     List<MessageDispatch> list = unconsumedMessages.removeAll();
                     if (!this.info.isBrowser()) {
@@ -849,9 +848,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
             if (!this.info.isBrowser()) {
                 for (MessageDispatch old : list) {
                     // ensure we don't filter this as a duplicate
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("on close, rollback duplicate: " + old.getMessage().getMessageId());
-                    }
+                    LOG.debug("on close, rollback duplicate: {}", old.getMessage().getMessageId());
                     session.connection.rollbackDuplicate(this, old.getMessage());
                 }
             }
@@ -1000,8 +997,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
      */
     private MessageAck makeAckForAllDeliveredMessages(byte type) {
         synchronized (deliveredMessages) {
-            if (deliveredMessages.isEmpty())
+            if (deliveredMessages.isEmpty()) {
                 return null;
+            }
 
             MessageDispatch md = deliveredMessages.getFirst();
             MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
@@ -1030,23 +1028,17 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         } else {
             // old pending ack being superseded by ack of another type, if is is not a delivered
             // ack and hence important, send it now so it is not lost.
-            if ( !oldPendingAck.isDeliveredAck()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sending old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                }
+            if (!oldPendingAck.isDeliveredAck()) {
+                LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
                 session.sendAck(oldPendingAck);
             } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("dropping old pending ack " + oldPendingAck + ", new pending: " + pendingAck);
-                }
+                LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, pendingAck);
             }
         }
         // AMQ-3956 evaluate both expired and normal msgs as
         // otherwise consumer may get stalled
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("ackLater: sending: " + pendingAck);
-            }
+            LOG.debug("ackLater: sending: {}", pendingAck);
             session.sendAck(pendingAck);
             pendingAck=null;
             deliveredCounter = 0;
@@ -1100,8 +1092,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         synchronized(deliveredMessages) {
             // Acknowledge all messages so far.
             MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
-            if (ack == null)
+            if (ack == null) {
                 return; // no msgs
+            }
 
             if (session.getTransacted()) {
                 rollbackOnFailedRecoveryRedelivery();
@@ -1138,8 +1131,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                     }
                 }
                 if (numberNotReplayed > 0) {
-                    LOG.info("waiting for redelivery of " + numberNotReplayed + " in transaction: "
-                            + previouslyDeliveredMessages.transactionId +  ", to consumer :" + this.getConsumerId());
+                    LOG.info("waiting for redelivery of {} in transaction: {}, to consumer: {}",
+                             numberNotReplayed, this.getConsumerId(), previouslyDeliveredMessages.transactionId);
                     try {
                         Thread.sleep(Math.max(500, failoverRedeliveryWaitPeriod/4));
                     } catch (InterruptedException outOfhere) {
@@ -1161,11 +1154,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
             for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
                 if (!entry.getValue()) {
                     numberNotReplayed++;
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("previously delivered message has not been replayed in transaction: "
-                                + previouslyDeliveredMessages.transactionId
-                                + " , messageId: " + entry.getKey());
-                    }
+                    LOG.debug("previously delivered message has not been replayed in transaction: {}, messageId: {}",
+                              previouslyDeliveredMessages.transactionId, entry.getKey());
                 }
             }
             if (numberNotReplayed > 0) {
@@ -1338,9 +1328,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
         if (previouslyDeliveredMessages != null) {
             for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet()) {
                 if (!entry.getValue()) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("rollback non redelivered: " + entry.getKey());
-                    }
+                    LOG.trace("rollback non redelivered: {}" + entry.getKey());
                     removeFromDeliveredMessages(entry.getKey());
                 }
             }
@@ -1396,7 +1384,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                                 }
                                 afterMessageIsConsumed(md, expired);
                             } catch (RuntimeException e) {
-                                LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
+                                LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e);
                                 if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
                                     // schedual redelivery and possible dlq processing
                                     md.setRollbackCause(e);
@@ -1421,9 +1409,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                         // deal with duplicate delivery
                         ConsumerId consumerWithPendingTransaction;
                         if (redeliveryExpectedInCurrentTransaction(md, true)) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
-                            }
+                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
                             if (transactedIndividualAck) {
                                 immediateIndividualTransactedAck(md);
                             } else {
@@ -1490,15 +1476,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                             for (MessageDispatch delivered : deliveredMessages) {
                                 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
                             }
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " tracking existing transacted " + previouslyDeliveredMessages.transactionId +
-                                        " delivered list (" + deliveredMessages.size() + ") on transport interrupt");
-                            }
+                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
+                                      getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
                         } else {
                             if (session.isClientAcknowledge()) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug(getConsumerId() + " rolling back delivered list (" + deliveredMessages.size() + ") on transport interrupt");
-                                }
+                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
                                 // allow redelivery
                                 if (!this.info.isBrowser()) {
                                     for (MessageDispatch md: deliveredMessages) {
@@ -1506,9 +1488,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                                     }
                                 }
                             }
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " clearing delivered list (" + deliveredMessages.size() + ") on transport interrupt");
-                            }
+                            LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
                             deliveredMessages.clear();
                             pendingAck = null;
                         }
@@ -1608,9 +1588,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
                 public void run() {
                     try {
                         if (optimizeAcknowledge && !unconsumedMessages.isClosed()) {
-                            if (LOG.isInfoEnabled()) {
-                                LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
-                            }
+                            LOG.info("Consumer:{} is performing scheduled delivery of outstanding optimized Acks", info.getConsumerId());
                             deliverAcks();
                         }
                     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/c38a61d7/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
index 801c80e..1f4fff2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
@@ -58,8 +58,7 @@ public class JmsQueueBrowserExpirationTest {
     // Message expires after 1 second
     private static final long TTL = 1000;
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(JmsQueueBrowserExpirationTest.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserExpirationTest.class);
 
     private BrokerService broker;
     private URI connectUri;
@@ -81,12 +80,13 @@ public class JmsQueueBrowserExpirationTest {
 
     @After
     public void stopBroker() throws Exception {
-        broker.stop();
-        broker.waitUntilStopped();
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
     }
 
-    //This should finish in under 3 seconds because the messages should be expired
-    @Test(timeout=3000)
+    @Test(timeout=10000)
     public void testBrowsingExpiration() throws JMSException, InterruptedException {
 
         sendTestMessages();
@@ -106,19 +106,14 @@ public class JmsQueueBrowserExpirationTest {
             // Give JMS threads more opportunity to do their work.
             Thread.sleep(100);
             browsed = browse(queue, browserConnection);
-            String time =
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin)
-                            + " ms";
-            System.out.println("[" + time + "] found " + browsed + " messages");
+            LOG.info("[{}ms] found {}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - begin), browsed);
         }
-        System.out.println("Finished");
+        LOG.info("Finished");
         browserConnection.close();
     }
 
-    private int browse(ActiveMQQueue queue, Connection connection)
-            throws JMSException {
-        Session session =
-                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    private int browse(ActiveMQQueue queue, Connection connection) throws JMSException {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         QueueBrowser browser = session.createBrowser(queue);
         Enumeration<?> enumeration = browser.getEnumeration();
         int browsed = 0;
@@ -146,8 +141,7 @@ public class JmsQueueBrowserExpirationTest {
             producer.send(prodSession.createTextMessage(msgStr));
             LOG.info("P&C: {}", msgStr);
         }
+
         prodSession.close();
     }
-
-
 }
\ No newline at end of file