You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/11/29 11:33:18 UTC
activemq git commit: [AMQ-6522] - remove hardcoded 32k batch limit
from recovery check of the journal, fix and test
Repository: activemq
Updated Branches:
refs/heads/master 0a29533ed -> dad629e88
[AMQ-6522] - remove hardcoded 32k batch limit from recovery check of the journal, fix and test
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/dad629e8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/dad629e8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/dad629e8
Branch: refs/heads/master
Commit: dad629e889b2116a778fd4f77680a1b2944b400f
Parents: 0a29533
Author: gtully <ga...@gmail.com>
Authored: Tue Nov 29 11:32:03 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Nov 29 11:32:03 2016 +0000
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 8 +-
.../store/kahadb/disk/journal/Journal.java | 4 +-
.../org/apache/activemq/bugs/AMQ6522Test.java | 117 +++++++++++++++++++
3 files changed, 122 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/dad629e8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8c79c66..c46a127 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -935,11 +935,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (!missingPredicates.isEmpty()) {
for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
final StoredDestination sd = sdEntry.getValue();
- final ArrayList<Long> matches = new ArrayList<Long>();
+ final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
@Override
protected void matched(Location key, Long value) {
- matches.add(value);
+ matches.put(value, key);
}
});
@@ -950,7 +950,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// we error out.
if( ignoreMissingJournalfiles ) {
// Update the index to remove the references to the missing data
- for (Long sequenceId : matches) {
+ for (Long sequenceId : matches.keySet()) {
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
@@ -960,7 +960,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
- LOG.error("[" + sdEntry.getKey() + "] references corrupt locations. " + matches.size() + " messages affected.");
+ LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/dad629e8/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 65a952b..548928e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -65,8 +65,6 @@ public class Journal {
public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
- private static final int MAX_BATCH_SIZE = 32*1024*1024;
-
private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
@@ -570,7 +568,7 @@ public class Journal {
}
int size = controlIs.readInt();
- if (size > MAX_BATCH_SIZE) {
+ if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
return -1;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/dad629e8/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
new file mode 100644
index 0000000..286788d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6522Test.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ6522Test {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ6522Test.class);
+
+ private BrokerService broker;
+ private ActiveMQConnectionFactory connectionFactory;
+ private final Destination destination = new ActiveMQQueue("large_message_queue");
+ private String connectionUri;
+
+ @Before
+ public void setUp() throws Exception {
+ initBroker(true);
+ }
+
+ public void initBroker(Boolean deleteAllMessages) throws Exception {
+ broker = createBroker();
+ broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+ broker.start();
+ connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+
+ PolicyEntry policy = new PolicyEntry();
+ policy.setUseCache(false);
+ broker.setDestinationPolicy(new PolicyMap());
+ broker.getDestinationPolicy().setDefaultEntry(policy);
+
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setCheckForCorruptJournalFiles(true);
+ kahadb.setPreallocationScope(Journal.PreallocationScope.NONE.name());
+
+ broker.setPersistenceAdapter(kahadb);
+ broker.setUseJmx(false);
+
+ return broker;
+ }
+
+
+ @Test
+ public void verifyMessageExceedsJournalRestartRecoveryCheck() throws Exception {
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[33*1024*1024]);
+ producer.send(message);
+
+ } finally {
+ connection.close();
+ }
+
+ tearDown();
+ initBroker(false);
+
+ connection = connectionFactory.createConnection();
+ connection.start();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ assertNotNull("Got message after restart", consumer.receive(20000));
+ } finally {
+ connection.close();
+ }
+ }
+}
\ No newline at end of file