You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/06/14 14:34:42 UTC

[activemq] 03/04: AMQ-7225 - defer cleanup task operation till recovery processing complete, track prepared location in recovered ops to ensure they are retained on recovery failure. Fix and test

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit ef0ec42885dc82bb31f266cc6cfe3c4065438453
Author: gtully <ga...@gmail.com>
AuthorDate: Mon Jun 10 15:31:23 2019 +0100

    AMQ-7225 - defer cleanup task operation till recovery processing complete, track prepared location in recovered ops to ensure they are retained on recovery failure. Fix and test
    
    (cherry picked from commit 93e726d6a7ba9ed44f5440369f8f9f1b41f49373)
---
 .../kahadb/MultiKahaDBPersistenceAdapter.java      |  10 +-
 .../store/kahadb/MultiKahaDBTransactionStore.java  |  23 ++-
 .../store/kahadb/disk/journal/Journal.java         |  11 +-
 .../activemq/bugs/MKahaDBTxRecoveryTest.java       | 224 +++++++++++++++++++++
 4 files changed, 262 insertions(+), 6 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4bdb8de..56e5e92 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -56,7 +56,6 @@ import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionIdTransformerAware;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -554,6 +553,15 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         return transactionStore.getJournalMaxWriteBatchSize();
     }
 
+
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        transactionStore.setJournalCleanupInterval(journalCleanupInterval);
+    }
+
+    public long getJournalCleanupInterval() {
+        return transactionStore.getJournalCleanupInterval();
+    }
+
     public List<PersistenceAdapter> getAdapters() {
         return Collections.unmodifiableList(adapters);
     }
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index ff70076..5befa92 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -66,6 +66,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean recovered = new AtomicBoolean(false);
+    private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
 
     public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
         this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
@@ -188,6 +190,14 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         this.journalWriteBatchSize = journalWriteBatchSize;
     }
 
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        this.journalCleanupInterval = journalCleanupInterval;
+    }
+
+    public long getJournalCleanupInterval() {
+        return journalCleanupInterval;
+    }
+
     public class Tx {
         private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
         private int prepareLocationId = 0;
@@ -308,14 +318,19 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             journal.setDirectory(getDirectory());
             journal.setMaxFileLength(journalMaxFileLength);
             journal.setWriteBatchSize(journalWriteBatchSize);
+            journal.setCleanupInterval(journalCleanupInterval);
             IOHelper.mkdirs(journal.getDirectory());
             journal.start();
             recoverPendingLocalTransactions();
+            recovered.set(true);
             store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
         }
     }
 
     private void txStoreCleanup() {
+        if (!recovered.get()) {
+            return;
+        }
         Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
         for (Tx tx : inflightTransactions.values()) {
             knownDataFileIds.remove(tx.getPreparedLocationId());
@@ -342,7 +357,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     private void recoverPendingLocalTransactions() throws IOException {
         Location location = journal.getNextLocation(null);
         while (location != null) {
-            process(load(location));
+            process(location, load(location));
             location = journal.getNextLocation(location);
         }
         recoveredPendingCommit.addAll(inflightTransactions.keySet());
@@ -361,11 +376,11 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return message;
     }
 
-    public void process(JournalCommand<?> command) throws IOException {
+    public void process(final Location location, JournalCommand<?> command) throws IOException {
         switch (command.type()) {
             case KAHA_PREPARE_COMMAND:
                 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
-                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
+                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
                 break;
             case KAHA_COMMIT_COMMAND:
                 KahaCommitCommand commitCommand = (KahaCommitCommand) command;
@@ -405,7 +420,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
                         if (recoveredPendingCommit.contains(txid)) {
                             LOG.info("delivering pending commit outcome for tid: " + txid);
                             broker.commitTransaction(null, txid, false);
-
+                            recoveredPendingCommit.remove(txid);
                         } else {
                             LOG.info("delivering rollback outcome to store for tid: " + txid);
                             broker.forgetTransaction(null, txid);
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 67d4c86..9a6e256 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -124,6 +124,14 @@ public class Journal {
         }
     }
 
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
     public enum PreallocationStrategy {
         SPARSE_FILE,
         OS_KERNEL_COPY,
@@ -230,6 +238,7 @@ public class Journal {
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
     private File osKernelCopyTemplateFile = null;
     private ByteBuffer preAllocateDirectBuffer = null;
+    private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
 
     protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
 
@@ -345,7 +354,7 @@ public class Journal {
             public void run() {
                 cleanup();
             }
-        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
+        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
 
         long end = System.currentTimeMillis();
         LOG.trace("Startup took: "+(end-start)+" ms");
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
new file mode 100644
index 0000000..4a7e9c6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class MKahaDBTxRecoveryTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
+    private final static int maxFileLength = 1024*1024*32;
+
+    private final static String PREFIX_DESTINATION_NAME = "queue";
+
+    private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test";
+    private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test";
+    private final static int CLEANUP_INTERVAL_MILLIS = 500;
+
+    BrokerService broker;
+    private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>();
+
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    @Test
+    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
+
+        prepareBrokerWithMultiStore(true);
+        broker.start();
+        broker.waitUntilStarted();
+
+
+        // Ensure we have an Admin View.
+        assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+
+        final AtomicBoolean injectFailure = new AtomicBoolean(true);
+
+        final AtomicInteger reps = new AtomicInteger();
+        final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
+
+        TransactionIdTransformer faultInjector  = new TransactionIdTransformer() {
+            @Override
+            public TransactionId transform(TransactionId txid) {
+                if (injectFailure.get() && reps.incrementAndGet() > 5) {
+                    throw new RuntimeException("Bla");
+                }
+                return delegate.get().transform(txid);
+            }
+        };
+        // set up kahadb to fail after N ops
+        for (KahaDBPersistenceAdapter pa : kahadbs) {
+            if (delegate.get() == null) {
+                delegate.set(pa.getStore().getTransactionIdTransformer());
+            }
+            pa.setTransactionIdTransformer(faultInjector);
+        }
+
+        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME  + "," + DESTINATION_NAME_2));
+        producer.send(s.createTextMessage("HI"));
+        try {
+            s.commit();
+        } catch (Exception expected) {
+            expected.printStackTrace();
+        }
+
+        assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
+        assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
+
+        final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+        final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+
+        assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
+            }
+        }));
+
+        // check completion on recovery
+        injectFailure.set(false);
+
+        // fire in many more local transactions to use N txStore journal files
+        for (int i=0; i<100; i++) {
+            producer.send(s.createTextMessage("HI"));
+            s.commit();
+        }
+
+        broker.stop();
+
+        // fail recovery processing on first attempt
+        prepareBrokerWithMultiStore(false);
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+
+            @Override
+            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+                // longer than CleanupInterval
+                TimeUnit.SECONDS.sleep( 2);
+                throw new RuntimeException("Sorry");
+            }
+        }});
+        broker.start();
+
+        // second recovery attempt should sort it
+        broker.stop();
+        prepareBrokerWithMultiStore(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        // verify commit completed
+        Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+
+        destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+    }
+
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        kahadbs.add(kaha);
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
+        adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages));
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
+        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+
+	private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages)
+			throws IOException {
+		FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        if (destinationPrefix != null) {
+            template.setQueue(destinationPrefix + ".>");
+        }
+		return template;
+	}
+}
\ No newline at end of file