You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/10/26 10:36:14 UTC

activemq git commit: [AMQ-6849] - fix sendFailIfNoSpaceAfterTimeout policy entry default value

Repository: activemq
Updated Branches:
  refs/heads/master 6da08b245 -> 8e576be1d


[AMQ-6849] - fix sendFailIfNoSpaceAfterTimeout policy entry default value


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e576be1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e576be1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e576be1

Branch: refs/heads/master
Commit: 8e576be1d9719367aa209562a128d489f27cdd01
Parents: 6da08b2
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 26 11:36:03 2017 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 26 11:36:03 2017 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/broker/region/Queue.java     |  2 +-
 .../activemq/broker/region/policy/PolicyEntry.java   |  2 +-
 .../apache/activemq/bugs/DuplicateFromStoreTest.java | 15 +++++++++------
 3 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8e576be1/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 3501ca4..f440f76 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -206,7 +206,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                             if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
                                 ExceptionResponse response = new ExceptionResponse(
                                         new ResourceAllocationException(
-                                                "Usage Manager Memory Limit reached. Stopping producer ("
+                                                "Usage Manager Memory Limit Wait Timeout. Stopping producer ("
                                                         + timeout.message.getProducerId()
                                                         + ") to prevent flooding "
                                                         + getActiveMQDestination().getQualifiedName()

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e576be1/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 8071e57..c5e5980 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -314,7 +314,7 @@ public class PolicyEntry extends DestinationMapEntry {
         if (sendFailIfNoSpace != -1) {
             destination.getSystemUsage().setSendFailIfNoSpace(isSendFailIfNoSpace());
         }
-        if (sendFailIfNoSpaceAfterTimeout != 0) {
+        if (sendFailIfNoSpaceAfterTimeout != -1) {
             destination.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(getSendFailIfNoSpaceAfterTimeout());
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e576be1/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
index 5854fc7..f7edebd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
@@ -66,6 +66,8 @@ public class DuplicateFromStoreTest {
     public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS );
 
     public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS);
+    public AtomicInteger totalMessagesSent = new AtomicInteger(NUM_MSGS);
+
     public AtomicInteger totalReceived = new AtomicInteger(0);
 
     public int messageSize = 16*1000;
@@ -92,7 +94,7 @@ public class DuplicateFromStoreTest {
 
         // configure <systemUsage>
         MemoryUsage memoryUsage = new MemoryUsage();
-        memoryUsage.setPercentOfJvmHeap(70);
+        memoryUsage.setPercentOfJvmHeap(50);
 
         StoreUsage storeUsage = new StoreUsage();
         storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb
@@ -133,7 +135,7 @@ public class DuplicateFromStoreTest {
 
         LOG.info("All producers and consumers got started. Awaiting their termination");
         producersFinished.await(100, TimeUnit.MINUTES);
-        LOG.info("All producers have terminated.");
+        LOG.info("All producers have terminated. remaining to send: " + totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get());
 
         consumersFinished.await(100, TimeUnit.MINUTES);
         LOG.info("All consumers have terminated.");
@@ -232,6 +234,7 @@ public class DuplicateFromStoreTest {
                 // send message
                 while (totalMessagesToSend.decrementAndGet() >= 0) {
                     producer.send(message);
+                    totalMessagesSent.incrementAndGet();
                     log.debug("Sent message: " + counter);
                     counter++;
 
@@ -241,7 +244,7 @@ public class DuplicateFromStoreTest {
                     Thread.sleep(PRODUCER_SLEEP);
                 }
             } catch (Exception ex) {
-                log.error(ex.getMessage());
+                log.error(ex.toString());
                 return;
             } finally {
                 try {
@@ -313,10 +316,10 @@ public class DuplicateFromStoreTest {
                         TextMessage textMessage = (TextMessage) message2;
                         String text = textMessage.getText();
                         log.debug("Received: " + text.substring(0, 50));
+                    } else if (totalReceived.get() < NUM_MSGS) {
+                        log.error("Received message of unsupported type. Expecting TextMessage. count: " + totalReceived.get());
                     } else {
-                        if (totalReceived.get() < NUM_MSGS) {
-                            log.error("Received message of unsupported type. Expecting TextMessage. " + message2);
-                        }
+                        // all done
                         break;
                     }
                     if (message2 != null) {