You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/12/05 13:49:56 UTC

[1/2] activemq git commit: [AMQ-6522] - remove hardcoded 32k batch limit from recovery check of the journal, fix and test

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 354265754 -> 5b6ce12fc


[AMQ-6522] - remove hardcoded 32k batch limit from recovery check of the journal, fix and test

(cherry picked from commit dad629e889b2116a778fd4f77680a1b2944b400f)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/89209d32
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/89209d32
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/89209d32

Branch: refs/heads/activemq-5.14.x
Commit: 89209d320cf5bf0c6e0755f6ba4dd33d73da5398
Parents: 3542657
Author: gtully <ga...@gmail.com>
Authored: Tue Nov 29 11:32:03 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Dec 5 08:45:20 2016 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |   8 +-
 .../store/kahadb/disk/journal/Journal.java      |   4 +-
 .../org/apache/activemq/bugs/AMQ6522Test.java   | 117 +++++++++++++++++++
 3 files changed, 122 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/89209d32/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8c79c66..c46a127 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -935,11 +935,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         if (!missingPredicates.isEmpty()) {
             for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
                 final StoredDestination sd = sdEntry.getValue();
-                final ArrayList<Long> matches = new ArrayList<Long>();
+                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
                 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
                     @Override
                     protected void matched(Location key, Long value) {
-                        matches.add(value);
+                        matches.put(value, key);
                     }
                 });
 
@@ -950,7 +950,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                     // we error out.
                     if( ignoreMissingJournalfiles ) {
                         // Update the index to remove the references to the missing data
-                        for (Long sequenceId : matches) {
+                        for (Long sequenceId : matches.keySet()) {
                             MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                             sd.locationIndex.remove(tx, keys.location);
                             sd.messageIdIndex.remove(tx, keys.messageId);
@@ -960,7 +960,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                             // TODO: do we need to modify the ack positions for the pub sub case?
                         }
                     } else {
-                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
+                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
                         throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
                     }
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/89209d32/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index e526698..5db5df7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -65,8 +65,6 @@ public class Journal {
     public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
     public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
 
-    private static final int MAX_BATCH_SIZE = 32*1024*1024;
-
     private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
 
     // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
@@ -553,7 +551,7 @@ public class Journal {
             }
 
             int size = controlIs.readInt();
-            if (size > MAX_BATCH_SIZE) {
+            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
                 return -1;
             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/89209d32/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
new file mode 100644
index 0000000..286788d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ6522Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ6522Test.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+    private final Destination destination = new ActiveMQQueue("large_message_queue");
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        initBroker(true);
+    }
+
+    public void initBroker(Boolean deleteAllMessages) throws Exception {
+        broker = createBroker();
+        broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setUseCache(false);
+        broker.setDestinationPolicy(new PolicyMap());
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setCheckForCorruptJournalFiles(true);
+        kahadb.setPreallocationScope(Journal.PreallocationScope.NONE.name());
+
+        broker.setPersistenceAdapter(kahadb);
+        broker.setUseJmx(false);
+
+        return broker;
+    }
+
+
+    @Test
+    public void verifyMessageExceedsJournalRestartRecoveryCheck() throws Exception {
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer producer = session.createProducer(destination);
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[33*1024*1024]);
+            producer.send(message);
+
+        } finally {
+            connection.close();
+        }
+
+        tearDown();
+        initBroker(false);
+
+        connection = connectionFactory.createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(destination);
+            assertNotNull("Got message after restart", consumer.receive(20000));
+        } finally {
+            connection.close();
+        }
+    }
+}
\ No newline at end of file


[2/2] activemq git commit: [AMQ-6518] - fix up aveMessageSize dest stat and test

Posted by cs...@apache.org.
[AMQ-6518] - fix up aveMessageSize dest stat and test

(cherry picked from commit cfdff4edc529984deea27067578810498541321c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5b6ce12f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5b6ce12f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5b6ce12f

Branch: refs/heads/activemq-5.14.x
Commit: 5b6ce12fc473569e7de3c307330ff11e158e49ad
Parents: 89209d3
Author: gtully <ga...@gmail.com>
Authored: Fri Nov 25 10:37:58 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Dec 5 08:45:56 2016 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/activemq/plugin/StatisticsBroker.java | 2 +-
 .../org/apache/activemq/plugin/BrokerStatisticsPluginTest.java     | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5b6ce12f/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
index 9e41138..7476c3e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
@@ -120,7 +120,7 @@ public class StatisticsBroker extends BrokerFilter {
                         statsMessage.setLong("inflightCount", stats.getInflight().getCount());
                         statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
                         // we are okay with the size without decimals so cast to long
-                        statsMessage.setLong("averageMessageSize", (long) stats.getMessageSize().getAveragePerSecond());
+                        statsMessage.setLong("averageMessageSize", (long) stats.getMessageSize().getAverageSize());
                         statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
                         statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
                         statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());

http://git-wip-us.apache.org/repos/asf/activemq/blob/5b6ce12f/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
index b2e4bdd..b003a16 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
@@ -117,6 +117,8 @@ public class BrokerStatisticsPluginTest extends TestCase{
         assertTrue(reply.getMapNames().hasMoreElements());
         assertTrue(reply.getJMSTimestamp() > 0);
         assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
+        assertTrue(reply.getLong("averageMessageSize") > 0);
+
         /*
         for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
             String name = e.nextElement().toString();