You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/04/16 16:31:29 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5703 - further tests and fixes.
Ensure early eof can be identified when checking for corruption and skip
corruption on replay when the checksum is invalid
Repository: activemq
Updated Branches:
refs/heads/master 4441001c4 -> 73db4d2bf
https://issues.apache.org/jira/browse/AMQ-5703 - further tests and fixes. Ensure early eof can be identified when checking for corruption and skip corruption on replay when the checksum is invalid
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/73db4d2b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/73db4d2b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/73db4d2b
Branch: refs/heads/master
Commit: 73db4d2bfda10584b2ac8e21240e25b9ef4d85c8
Parents: 7dc522d
Author: gtully <ga...@gmail.com>
Authored: Thu Apr 16 14:58:45 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Apr 16 15:25:58 2015 +0100
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 22 +-
.../store/kahadb/disk/journal/Journal.java | 24 +-
.../JournalCorruptionEofIndexRecoveryTest.java | 312 +++++++++++++++++++
3 files changed, 351 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/73db4d2b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3cc6879..32b7170 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -603,10 +603,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
int redoCounter = 0;
LOG.info("Recovering from the journal @" + recoveryPosition);
while (recoveryPosition != null) {
- JournalCommand<?> message = load(recoveryPosition);
- metadata.lastUpdate = recoveryPosition;
- process(message, recoveryPosition, lastIndoubtPosition);
- redoCounter++;
+ try {
+ JournalCommand<?> message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
+ process(message, recoveryPosition, lastIndoubtPosition);
+ redoCounter++;
+ } catch (IOException failedRecovery) {
+ if (isIgnoreMissingJournalfiles()) {
+ // track this dud location
+ journal.corruptRecoveryLocation(recoveryPosition);
+ } else {
+ throw failedRecovery;
+ }
+ }
recoveryPosition = journal.getNextLocation(recoveryPosition);
if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
@@ -826,8 +835,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
if (!missingPredicates.isEmpty()) {
- for (StoredDestination sd : storedDestinations.values()) {
-
+ for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
+ final StoredDestination sd = sdEntry.getValue();
final ArrayList<Long> matches = new ArrayList<Long>();
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
@Override
@@ -847,6 +856,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
sd.locationIndex.remove(tx, keys.location);
sd.messageIdIndex.remove(tx, keys.messageId);
+ LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
undoCounter++;
// TODO: do we need to modify the ack positions for the pub sub case?
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/73db4d2b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 394e3c0..8c82cf2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -55,6 +55,28 @@ public class Journal {
public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
+ // tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss
+ public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
+ DataFile dataFile = getDataFile(recoveryPosition);
+ // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
+ Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
+ LOG.info("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
+
+ // skip corruption on getNextLocation
+ recoveryPosition.setOffset((int) sequence.getLast() + 1);
+ recoveryPosition.setSize(-1);
+
+ dataFile.corruptedBlocks.add(sequence);
+
+ } catch (IOException e) {
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+ }
+
public enum PreallocationStrategy {
SPARSE_FILE,
OS_KERNEL_COPY,
@@ -301,7 +323,7 @@ public class Journal {
try {
while( true ) {
int size = checkBatchRecord(reader, location.getOffset());
- if ( size>=0 ) {
+ if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) {
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
} else {
http://git-wip-us.apache.org/repos/asf/activemq/blob/73db4d2b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
new file mode 100644
index 0000000..948b543
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class JournalCorruptionEofIndexRecoveryTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionEofIndexRecoveryTest.class);
+
+ ActiveMQConnectionFactory cf = null;
+ BrokerService broker = null;
+ private final Destination destination = new ActiveMQQueue("Test");
+ private String connectionUri;
+ private KahaDBPersistenceAdapter adapter;
+
+
+ protected void startBroker() throws Exception {
+ doStartBroker(true, false);
+ }
+
+
+ protected void restartBroker(boolean whackIndex) throws Exception {
+ restartBroker(whackIndex, false);
+ }
+
+ protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception {
+ File dataDir = broker.getPersistenceAdapter().getDirectory();
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ if (whackIndex) {
+ File indexToDelete = new File(dataDir, "db.data");
+ LOG.info("Whacking index: " + indexToDelete);
+ indexToDelete.delete();
+ }
+
+ doStartBroker(false, forceRecoverIndex);
+ }
+
+
+ private void doStartBroker(boolean delete, boolean forceRecoverIndex) throws Exception {
+ broker = new BrokerService();
+ if (delete) {
+ IOHelper.deleteChildren(broker.getPersistenceAdapter().getDirectory());
+ IOHelper.delete(broker.getPersistenceAdapter().getDirectory());
+ }
+
+ broker.setPersistent(true);
+ broker.setUseJmx(true);
+ broker.addConnector("tcp://localhost:0");
+
+ configurePersistence(broker, forceRecoverIndex);
+
+ connectionUri = "vm://localhost?create=false";
+ cf = new ActiveMQConnectionFactory(connectionUri);
+
+ broker.start();
+ LOG.info("Starting broker..");
+ }
+
+ protected void configurePersistence(BrokerService brokerService, boolean forceRecoverIndex) throws Exception {
+ adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+ adapter.setForceRecoverIndex(forceRecoverIndex);
+
+ // ensure there are a bunch of data files but multiple entries in each
+ adapter.setJournalMaxFileLength(1024 * 20);
+
+ // speed up the test case, checkpoint an cleanup early and often
+ adapter.setCheckpointInterval(5000);
+ adapter.setCleanupInterval(5000);
+
+ adapter.setCheckForCorruptJournalFiles(true);
+ adapter.setIgnoreMissingJournalfiles(true);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+
+ @Test
+ public void testRecoveryAfterCorruptionEof() throws Exception {
+ startBroker();
+
+ produceMessagesToConsumeMultipleDataFiles(50);
+
+ int numFiles = getNumberOfJournalFiles();
+
+ assertTrue("more than x files: " + numFiles, numFiles > 2);
+
+ corruptBatchEndEof(3);
+
+ restartBroker(false);
+
+ assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
+
+ assertEquals("Drain", 49, drainQueue(49));
+
+ }
+
+ @Test
+ public void testRecoveryAfterCorruptionCheckSum() throws Exception {
+ startBroker();
+
+ produceMessagesToConsumeMultipleDataFiles(4);
+
+ corruptBatchCheckSumSplash(1);
+
+ restartBroker(true);
+
+ assertEquals("missing one message", 3, broker.getAdminView().getTotalMessageCount());
+
+ assertEquals("Drain", 3, drainQueue(4));
+
+ }
+
+ @Test
+ public void testRecoveryAfterCorruptionCheckSumExistingIndex() throws Exception {
+ startBroker();
+
+ produceMessagesToConsumeMultipleDataFiles(4);
+
+ corruptBatchCheckSumSplash(1);
+
+ restartBroker(false);
+
+ assertEquals("unnoticed", 4, broker.getAdminView().getTotalMessageCount());
+
+ assertEquals("Drain", 0, drainQueue(4));
+
+ // force recover index and loose one message
+ restartBroker(false, true);
+
+ assertEquals("missing one index recreation", 3, broker.getAdminView().getTotalMessageCount());
+
+ assertEquals("Drain", 3, drainQueue(4));
+
+ }
+
+ private void corruptBatchCheckSumSplash(int id) throws Exception{
+ Collection<DataFile> files =
+ ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+ DataFile dataFile = (DataFile) files.toArray()[0];
+ RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
+
+ ArrayList<Integer> batchPositions = findBatch(randomAccessFile, Integer.MAX_VALUE);
+ LOG.info("Batch positions: " + batchPositions);
+ int pos = batchPositions.get(1);
+ LOG.info("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos);
+ randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_HEADER.length + 4);
+ // whack the batch control record checksum
+ randomAccessFile.writeLong(0l);
+
+ // mod the data size in the location header so reading blows
+ randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_SIZE);
+ int size = randomAccessFile.readInt();
+ byte type = randomAccessFile.readByte();
+
+ LOG.info("Read: size:" + size + ", type:" + type);
+
+ randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_SIZE);
+ size -= 1;
+ LOG.info("rewrite incorrect location size @:" + (pos + Journal.BATCH_CONTROL_RECORD_SIZE) + " as: " + size);
+ randomAccessFile.writeInt(size);
+
+ randomAccessFile.getChannel().force(true);
+
+ }
+
+ private void corruptBatchEndEof(int id) throws Exception{
+ Collection<DataFile> files =
+ ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+ DataFile dataFile = (DataFile) files.toArray()[id];
+ RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
+
+ ArrayList<Integer> batchPositions = findBatch(randomAccessFile, Integer.MAX_VALUE);
+ int pos = batchPositions.get(batchPositions.size() - 3);
+ LOG.info("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos);
+ randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_HEADER.length);
+ randomAccessFile.writeInt(31 * 1024 * 1024);
+ randomAccessFile.writeLong(0l);
+ randomAccessFile.getChannel().force(true);
+
+ }
+
+ private ArrayList<Integer> findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException {
+ final ArrayList<Integer> batchPositions = new ArrayList<Integer>();
+ final ByteSequence header = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER);
+ byte data[] = new byte[1024 * 20];
+
+ ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data, 0, data.length));
+
+ int pos = 0;
+ for (int i = 0; i < where; i++) {
+ int found = bs.indexOf(header, pos);
+ if (found == -1) {
+ break;
+ }
+ batchPositions.add(found);
+ pos = found + Journal.BATCH_CONTROL_RECORD_HEADER.length - 1;
+ }
+
+ return batchPositions;
+ }
+
+
+ private int getNumberOfJournalFiles() throws IOException {
+
+ Collection<DataFile> files =
+ ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+ int reality = 0;
+ for (DataFile file : files) {
+ if (file != null) {
+ reality++;
+ }
+ }
+ return reality;
+ }
+
+
+ private int produceMessages(Destination destination, int numToSend) throws Exception {
+ int sent = 0;
+ Connection connection = new ActiveMQConnectionFactory(
+ broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+ connection.start();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < numToSend; i++) {
+ producer.send(createMessage(session, i));
+ sent++;
+ }
+ } finally {
+ connection.close();
+ }
+
+ return sent;
+ }
+
+ private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+ return produceMessages(destination, numToSend);
+ }
+
+ final String payload = new String(new byte[1024]);
+
+ private Message createMessage(Session session, int i) throws Exception {
+ return session.createTextMessage(payload + "::" + i);
+ }
+
+ private int drainQueue(int max) throws Exception {
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ int count = 0;
+ while (count < max && consumer.receive(5000) != null) {
+ count++;
+ }
+ consumer.close();
+ connection.close();
+ return count;
+ }
+}
[2/2] activemq git commit: remove timing depdendency
Posted by gt...@apache.org.
remove timing depdendency
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7dc522d4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7dc522d4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7dc522d4
Branch: refs/heads/master
Commit: 7dc522d4c30736191aa256f07be051dc5e5f5e94
Parents: 4441001
Author: gtully <ga...@gmail.com>
Authored: Mon Apr 13 16:46:08 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Apr 16 15:25:58 2015 +0100
----------------------------------------------------------------------
.../apache/activemq/network/DemandForwardingBridgeTest.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7dc522d4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
index 9794337..9c23781 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -75,6 +75,13 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
connection1.send(consumerInfo1.createRemoveCommand());
final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == destinationStatistics.getDispatched().getCount();
+ }
+ });
assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());