You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/08/06 16:21:41 UTC

[1/3] git commit: add amqp transport module to rar

Repository: activemq
Updated Branches:
  refs/heads/trunk 6bdce73d8 -> fff3c8397


add amqp transport module to rar


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fff3c839
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fff3c839
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fff3c839

Branch: refs/heads/trunk
Commit: fff3c839712a6fc11931236d9e2158cc25a770cf
Parents: 58ae402
Author: gtully <ga...@gmail.com>
Authored: Wed Aug 6 15:20:43 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Aug 6 15:21:19 2014 +0100

----------------------------------------------------------------------
 activemq-rar/pom.xml                        | 4 ++++
 activemq-rar/src/main/rar/broker-config.xml | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fff3c839/activemq-rar/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-rar/pom.xml b/activemq-rar/pom.xml
index ec6fc4f..14c2f97 100644
--- a/activemq-rar/pom.xml
+++ b/activemq-rar/pom.xml
@@ -340,6 +340,10 @@
       <artifactId>commons-pool</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-amqp</artifactId>
+    </dependency>
+    <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/activemq/blob/fff3c839/activemq-rar/src/main/rar/broker-config.xml
----------------------------------------------------------------------
diff --git a/activemq-rar/src/main/rar/broker-config.xml b/activemq-rar/src/main/rar/broker-config.xml
index c47bfa8..36856cd 100644
--- a/activemq-rar/src/main/rar/broker-config.xml
+++ b/activemq-rar/src/main/rar/broker-config.xml
@@ -37,7 +37,7 @@
      </persistenceAdapter>
 
     <transportConnectors>
-      <transportConnector uri="tcp://localhost:61616"/>
+      <transportConnector name="openwire" uri="tcp://localhost:61616"/>
     </transportConnectors>
     
   </broker>


[3/3] git commit: https://issues.apache.org/jira/browse/AMQ-4930 - ensure we page in messages for browse/expire when destination stats are disabled via config

Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-4930 - ensure we page in messages for browse/expire when destination stats are disabled via config


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41659725
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41659725
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41659725

Branch: refs/heads/trunk
Commit: 41659725f4c4fa027386148077aa76c31d8853af
Parents: 6bdce73
Author: gtully <ga...@gmail.com>
Authored: Tue Aug 5 16:32:44 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Aug 6 15:21:19 2014 +0100

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 12 ++++++++--
 .../org/apache/activemq/bugs/AMQ4930Test.java   | 24 +++++++++++++++-----
 2 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/41659725/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 647ba68..3cdd91c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1232,9 +1232,17 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         } finally {
             pagedInMessagesLock.readLock().unlock();
         }
-        LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, destinationStatistics.getMessages().getCount(), memoryUsage.getPercentUsage()});
+        int messagesInQueue = 0;
+        messagesLock.readLock().lock();
+        try {
+            messagesInQueue = messages.size();
+        } finally {
+            messagesLock.readLock().unlock();
+        }
+
+        LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()});
         return (alreadyPagedIn < max)
-                && (alreadyPagedIn < destinationStatistics.getMessages().getCount())
+                && (alreadyPagedIn < messagesInQueue)
                 && messages.hasSpace();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/41659725/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
index f75eae3..e6bea2a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java
@@ -30,6 +30,7 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +38,7 @@ public class AMQ4930Test extends TestCase {
     private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class);
     final int messageCount = 150;
     final int messageSize = 1024*1024;
+    final int maxBrowsePageSize = 50;
     final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG");
     BrokerService broker;
     ActiveMQConnectionFactory factory;
@@ -50,8 +52,8 @@ public class AMQ4930Test extends TestCase {
         PolicyEntry policy = new PolicyEntry();
         // disable expriy processing as this will call browse in parallel
         policy.setExpireMessagesPeriod(0);
-        policy.setMaxPageSize(50);
-        policy.setMaxBrowsePageSize(50);
+        policy.setMaxPageSize(maxBrowsePageSize);
+        policy.setMaxBrowsePageSize(maxBrowsePageSize);
         pMap.setDefaultEntry(policy);
 
         broker.setDestinationPolicy(pMap);
@@ -65,6 +67,11 @@ public class AMQ4930Test extends TestCase {
         doTestBrowsePending(DeliveryMode.PERSISTENT);
     }
 
+    public void testWithStatsDisabled() throws Exception {
+        ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().setEnabled(false);
+        doTestBrowsePending(DeliveryMode.PERSISTENT);
+    }
+
     public void doTestBrowsePending(int deliveryMode) throws Exception {
 
         Connection connection = factory.createConnection();
@@ -77,7 +84,6 @@ public class AMQ4930Test extends TestCase {
 
         for (int i = 0; i < messageCount; i++) {
             producer.send(bigQueue, bytesMessage);
-            LOG.info("Sent: " + i);
         }
 
         final QueueViewMBean queueViewMBean = (QueueViewMBean)
@@ -94,15 +100,21 @@ public class AMQ4930Test extends TestCase {
         final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue);
 
         // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit
-        underTest.browse();
-        underTest.browse();
+        Message[] browsed = underTest.browse();
+        LOG.info("Browsed: " + browsed.length);
+        assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
+        browsed = underTest.browse();
+        LOG.info("Browsed: " + browsed.length);
+        assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
         Runtime.getRuntime().gc();
         long free = Runtime.getRuntime().freeMemory()/1024;
         LOG.info("free at start of check: " + free);
         // check for memory growth
         for (int i=0; i<10; i++) {
             LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024);
-            underTest.browse();
+            browsed = underTest.browse();
+            LOG.info("Browsed: " + browsed.length);
+            assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length);
             Runtime.getRuntime().gc();
             Runtime.getRuntime().gc();
             assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1)));


[2/3] git commit: https://issues.apache.org/jira/browse/AMQ-5016 - evolve the class in a serialization compat way so that exiting kahadb stores can be read, long field addition

Posted by gt...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5016 - evolve the class in a serialization compat way so that exiting kahadb stores can be read, long field addition


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/58ae402b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/58ae402b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/58ae402b

Branch: refs/heads/trunk
Commit: 58ae402b1d61e74511cfd29e1aea7e9597b3a76b
Parents: 4165972
Author: gtully <ga...@gmail.com>
Authored: Wed Aug 6 15:19:50 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Wed Aug 6 15:21:19 2014 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/util/BitArrayBin.java   | 24 ++++++++++----------
 1 file changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/58ae402b/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
index 528144e..d988ae1 100755
--- a/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/BitArrayBin.java
@@ -29,9 +29,9 @@ public class BitArrayBin implements Serializable {
     private static final long serialVersionUID = 1L;
     private final LinkedList<BitArray> list;
     private int maxNumberOfArrays;
-    private long firstIndex = -1;
+    private int firstIndex = -1;  // leave 'int' for old serialization compatibility and introduce new 'long' field
     private long lastInOrderBit=-1;
-
+    private long longFirstIndex=-1;
     /**
      * Create a BitArrayBin to a certain window size (number of messages to
      * keep)
@@ -90,7 +90,7 @@ public class BitArrayBin implements Serializable {
      * @return true/false
      */
     public boolean getBit(long index) {
-        boolean answer = index >= firstIndex;
+        boolean answer = index >= longFirstIndex;
         BitArray ba = getBitArray(index);
         if (ba != null) {
             int offset = getOffset(index);
@@ -119,7 +119,7 @@ public class BitArrayBin implements Serializable {
                 int overShoot = bin - maxNumberOfArrays + 1;
                 while (overShoot > 0) {
                     list.removeFirst();
-                    firstIndex += BitArray.LONG_SIZE;
+                    longFirstIndex += BitArray.LONG_SIZE;
                     list.add(new BitArray());
                     overShoot--;
                 }
@@ -143,10 +143,10 @@ public class BitArrayBin implements Serializable {
      */
     private int getBin(long index) {
         int answer = 0;
-        if (firstIndex < 0) {
-            firstIndex = (int) (index - (index % BitArray.LONG_SIZE));
-        } else if (firstIndex >= 0) {
-            answer = (int)((index - firstIndex) / BitArray.LONG_SIZE);
+        if (longFirstIndex < 0) {
+            longFirstIndex = (int) (index - (index % BitArray.LONG_SIZE));
+        } else if (longFirstIndex >= 0) {
+            answer = (int)((index - longFirstIndex) / BitArray.LONG_SIZE);
         }
         return answer;
     }
@@ -159,8 +159,8 @@ public class BitArrayBin implements Serializable {
      */
     private int getOffset(long index) {
         int answer = 0;
-        if (firstIndex >= 0) {
-            answer = (int)((index - firstIndex) - (BitArray.LONG_SIZE * getBin(index)));
+        if (longFirstIndex >= 0) {
+            answer = (int)((index - longFirstIndex) - (BitArray.LONG_SIZE * getBin(index)));
         }
         return answer;
     }
@@ -168,8 +168,8 @@ public class BitArrayBin implements Serializable {
     public long getLastSetIndex() {
         long result = -1;
 
-        if (firstIndex >=0) {
-            result = firstIndex;
+        if (longFirstIndex >=0) {
+            result = longFirstIndex;
             BitArray last = null;
             for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
                 last = list.get(lastBitArrayIndex);