You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/04/16 16:31:29 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5703 - further tests and fixes. Ensure early eof can be identified when checking for corruption and skip corruption on replay when the checksum is invalid

Repository: activemq
Updated Branches:
  refs/heads/master 4441001c4 -> 73db4d2bf


https://issues.apache.org/jira/browse/AMQ-5703 - further tests and fixes. Ensure early eof can be identified when checking for corruption and skip corruption on replay when the checksum is invalid


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/73db4d2b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/73db4d2b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/73db4d2b

Branch: refs/heads/master
Commit: 73db4d2bfda10584b2ac8e21240e25b9ef4d85c8
Parents: 7dc522d
Author: gtully <ga...@gmail.com>
Authored: Thu Apr 16 14:58:45 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Apr 16 15:25:58 2015 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  22 +-
 .../store/kahadb/disk/journal/Journal.java      |  24 +-
 .../JournalCorruptionEofIndexRecoveryTest.java  | 312 +++++++++++++++++++
 3 files changed, 351 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/73db4d2b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 3cc6879..32b7170 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -603,10 +603,19 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 int redoCounter = 0;
                 LOG.info("Recovering from the journal @" + recoveryPosition);
                 while (recoveryPosition != null) {
-                    JournalCommand<?> message = load(recoveryPosition);
-                    metadata.lastUpdate = recoveryPosition;
-                    process(message, recoveryPosition, lastIndoubtPosition);
-                    redoCounter++;
+                    try {
+                        JournalCommand<?> message = load(recoveryPosition);
+                        metadata.lastUpdate = recoveryPosition;
+                        process(message, recoveryPosition, lastIndoubtPosition);
+                        redoCounter++;
+                    } catch (IOException failedRecovery) {
+                        if (isIgnoreMissingJournalfiles()) {
+                            // track this dud location
+                            journal.corruptRecoveryLocation(recoveryPosition);
+                        } else {
+                            throw failedRecovery;
+                        }
+                    }
                     recoveryPosition = journal.getNextLocation(recoveryPosition);
                      if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
                          LOG.info("@" + recoveryPosition +  ", "  + redoCounter + " entries recovered ..");
@@ -826,8 +835,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         }
 
         if (!missingPredicates.isEmpty()) {
-            for (StoredDestination sd : storedDestinations.values()) {
-
+            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
+                final StoredDestination sd = sdEntry.getValue();
                 final ArrayList<Long> matches = new ArrayList<Long>();
                 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
                     @Override
@@ -847,6 +856,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                             MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                             sd.locationIndex.remove(tx, keys.location);
                             sd.messageIdIndex.remove(tx, keys.messageId);
+                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
                             undoCounter++;
                             // TODO: do we need to modify the ack positions for the pub sub case?
                         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/73db4d2b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 394e3c0..8c82cf2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -55,6 +55,28 @@ public class Journal {
     public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
     public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
 
+    // tackle corruption when checksum is disabled or corrupt with zeros, minimise data loss
+    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
+        DataFile dataFile = getDataFile(recoveryPosition);
+        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
+        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+        try {
+            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
+            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
+            LOG.info("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
+
+            // skip corruption on getNextLocation
+            recoveryPosition.setOffset((int) sequence.getLast() + 1);
+            recoveryPosition.setSize(-1);
+
+            dataFile.corruptedBlocks.add(sequence);
+
+        } catch (IOException e) {
+        } finally {
+            accessorPool.closeDataFileAccessor(reader);
+        }
+    }
+
     public enum PreallocationStrategy {
         SPARSE_FILE,
         OS_KERNEL_COPY,
@@ -301,7 +323,7 @@ public class Journal {
         try {
             while( true ) {
                 int size = checkBatchRecord(reader, location.getOffset());
-                if ( size>=0 ) {
+                if ( size>=0 && location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size <= dataFile.getLength()) {
                     location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
                 } else {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/73db4d2b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
new file mode 100644
index 0000000..948b543
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class JournalCorruptionEofIndexRecoveryTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JournalCorruptionEofIndexRecoveryTest.class);
+
+    ActiveMQConnectionFactory cf = null;
+    BrokerService broker = null;
+    private final Destination destination = new ActiveMQQueue("Test");
+    private String connectionUri;
+    private KahaDBPersistenceAdapter adapter;
+
+
+    protected void startBroker() throws Exception {
+        doStartBroker(true, false);
+    }
+
+
+    protected void restartBroker(boolean whackIndex) throws Exception {
+        restartBroker(whackIndex, false);
+    }
+
+    protected void restartBroker(boolean whackIndex, boolean forceRecoverIndex) throws Exception {
+        File dataDir = broker.getPersistenceAdapter().getDirectory();
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        if (whackIndex) {
+            File indexToDelete = new File(dataDir, "db.data");
+            LOG.info("Whacking index: " + indexToDelete);
+            indexToDelete.delete();
+        }
+
+        doStartBroker(false, forceRecoverIndex);
+    }
+
+
+    private void doStartBroker(boolean delete, boolean forceRecoverIndex) throws Exception {
+        broker = new BrokerService();
+        if (delete) {
+            IOHelper.deleteChildren(broker.getPersistenceAdapter().getDirectory());
+            IOHelper.delete(broker.getPersistenceAdapter().getDirectory());
+        }
+
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:0");
+
+        configurePersistence(broker, forceRecoverIndex);
+
+        connectionUri = "vm://localhost?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean forceRecoverIndex) throws Exception {
+        adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        adapter.setForceRecoverIndex(forceRecoverIndex);
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(5000);
+        adapter.setCleanupInterval(5000);
+
+        adapter.setCheckForCorruptJournalFiles(true);
+        adapter.setIgnoreMissingJournalfiles(true);
+
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+
+    @Test
+    public void testRecoveryAfterCorruptionEof() throws Exception {
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFiles = getNumberOfJournalFiles();
+
+        assertTrue("more than x files: " + numFiles, numFiles > 2);
+
+        corruptBatchEndEof(3);
+
+        restartBroker(false);
+
+        assertEquals("missing one message", 49, broker.getAdminView().getTotalMessageCount());
+
+        assertEquals("Drain", 49, drainQueue(49));
+
+    }
+
+    @Test
+    public void testRecoveryAfterCorruptionCheckSum() throws Exception {
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(4);
+
+        corruptBatchCheckSumSplash(1);
+
+        restartBroker(true);
+
+        assertEquals("missing one message", 3, broker.getAdminView().getTotalMessageCount());
+
+        assertEquals("Drain", 3, drainQueue(4));
+
+    }
+
+    @Test
+    public void testRecoveryAfterCorruptionCheckSumExistingIndex() throws Exception {
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(4);
+
+        corruptBatchCheckSumSplash(1);
+
+        restartBroker(false);
+
+        assertEquals("unnoticed", 4, broker.getAdminView().getTotalMessageCount());
+
+        assertEquals("Drain", 0, drainQueue(4));
+
+        // force recover index and loose one message
+        restartBroker(false, true);
+
+        assertEquals("missing one index recreation", 3, broker.getAdminView().getTotalMessageCount());
+
+        assertEquals("Drain", 3, drainQueue(4));
+
+    }
+
+    private void corruptBatchCheckSumSplash(int id) throws Exception{
+        Collection<DataFile> files =
+                ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        DataFile dataFile = (DataFile) files.toArray()[0];
+        RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
+
+        ArrayList<Integer> batchPositions = findBatch(randomAccessFile, Integer.MAX_VALUE);
+        LOG.info("Batch positions: " + batchPositions);
+        int pos = batchPositions.get(1);
+        LOG.info("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos);
+        randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_HEADER.length + 4);
+        // whack the batch control record checksum
+        randomAccessFile.writeLong(0l);
+
+        // mod the data size in the location header so reading blows
+        randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_SIZE);
+        int size = randomAccessFile.readInt();
+        byte type = randomAccessFile.readByte();
+
+        LOG.info("Read: size:" + size + ", type:" + type);
+
+        randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_SIZE);
+        size -= 1;
+        LOG.info("rewrite incorrect location size @:" + (pos + Journal.BATCH_CONTROL_RECORD_SIZE) + " as: " + size);
+        randomAccessFile.writeInt(size);
+
+        randomAccessFile.getChannel().force(true);
+
+    }
+
+    private void corruptBatchEndEof(int id) throws Exception{
+        Collection<DataFile> files =
+                ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        DataFile dataFile = (DataFile) files.toArray()[id];
+        RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
+
+        ArrayList<Integer> batchPositions = findBatch(randomAccessFile, Integer.MAX_VALUE);
+        int pos = batchPositions.get(batchPositions.size() - 3);
+        LOG.info("corrupting checksum and size (to push it past eof) of batch record at:" + id + "-" + pos);
+        randomAccessFile.seek(pos + Journal.BATCH_CONTROL_RECORD_HEADER.length);
+        randomAccessFile.writeInt(31 * 1024 * 1024);
+        randomAccessFile.writeLong(0l);
+        randomAccessFile.getChannel().force(true);
+
+    }
+
+    private ArrayList<Integer> findBatch(RecoverableRandomAccessFile randomAccessFile, int where) throws IOException {
+        final ArrayList<Integer> batchPositions = new ArrayList<Integer>();
+        final ByteSequence header = new ByteSequence(Journal.BATCH_CONTROL_RECORD_HEADER);
+        byte data[] = new byte[1024 * 20];
+
+        ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data, 0, data.length));
+
+        int pos = 0;
+        for (int i = 0; i < where; i++) {
+            int found = bs.indexOf(header, pos);
+            if (found == -1) {
+                break;
+            }
+            batchPositions.add(found);
+            pos = found + Journal.BATCH_CONTROL_RECORD_HEADER.length - 1;
+        }
+
+        return batchPositions;
+    }
+
+
+    private int getNumberOfJournalFiles() throws IOException {
+
+        Collection<DataFile> files =
+                ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+        return reality;
+    }
+
+
+    private int produceMessages(Destination destination, int numToSend) throws Exception {
+        int sent = 0;
+        Connection connection = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            for (int i = 0; i < numToSend; i++) {
+                producer.send(createMessage(session, i));
+                sent++;
+            }
+        } finally {
+            connection.close();
+        }
+
+        return sent;
+    }
+
+    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+        return produceMessages(destination, numToSend);
+    }
+
+    final String payload = new String(new byte[1024]);
+
+    private Message createMessage(Session session, int i) throws Exception {
+        return session.createTextMessage(payload + "::" + i);
+    }
+
+    private int drainQueue(int max) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        int count = 0;
+        while (count < max && consumer.receive(5000) != null) {
+            count++;
+        }
+        consumer.close();
+        connection.close();
+        return count;
+    }
+}


[2/2] activemq git commit: remove timing depdendency

Posted by gt...@apache.org.
remove timing depdendency


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7dc522d4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7dc522d4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7dc522d4

Branch: refs/heads/master
Commit: 7dc522d4c30736191aa256f07be051dc5e5f5e94
Parents: 4441001
Author: gtully <ga...@gmail.com>
Authored: Mon Apr 13 16:46:08 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Apr 16 15:25:58 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/network/DemandForwardingBridgeTest.java   | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7dc522d4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
index 9794337..9c23781 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DemandForwardingBridgeTest.java
@@ -75,6 +75,13 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
         connection1.send(consumerInfo1.createRemoveCommand());
 
         final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == destinationStatistics.getDispatched().getCount();
+            }
+        });
         assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
         assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
         assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());