You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2019/06/14 14:34:39 UTC

[activemq] branch activemq-5.15.x updated (34a235b -> 6ff79d8)

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a change to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git.


    from 34a235b  AMQ-7221 - Delete Scheduled messages causes ActiveMQ create/write a unnecessary huge transaction file
     new 8f8fda2  AMQ-7221 - Fix InMemoryJobSchedulerManagementTest that was broken after patch
     new 996081e  AMQ-7227 - ensure db.free file is moved in scheduler store upgrade process
     new ef0ec42  AMQ-7225 - defer cleanup task operation till recovery processing complete, track prepared location in recovered ops to ensure they are retained on recovery failure. Fix and test
     new 6ff79d8  AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kahadb/MultiKahaDBPersistenceAdapter.java      |  10 +-
 .../store/kahadb/MultiKahaDBTransactionStore.java  |  35 +++-
 .../store/kahadb/disk/journal/Journal.java         |  11 +-
 .../kahadb/scheduler/JobSchedulerStoreImpl.java    |   2 +-
 .../scheduler/JobSchedulerManagementTest.java      |  11 +-
 ...AMQ4407Test.java => MKahaDBTxRecoveryTest.java} | 195 ++++++++++++---------
 6 files changed, 164 insertions(+), 100 deletions(-)
 copy activemq-unit-tests/src/test/java/org/apache/activemq/bugs/{AMQ4407Test.java => MKahaDBTxRecoveryTest.java} (52%)


[activemq] 03/04: AMQ-7225 - defer cleanup task operation till recovery processing complete, track prepared location in recovered ops to ensure they are retained on recovery failure. Fix and test

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit ef0ec42885dc82bb31f266cc6cfe3c4065438453
Author: gtully <ga...@gmail.com>
AuthorDate: Mon Jun 10 15:31:23 2019 +0100

    AMQ-7225 - defer cleanup task operation till recovery processing complete, track prepared location in recovered ops to ensure they are retained on recovery failure. Fix and test
    
    (cherry picked from commit 93e726d6a7ba9ed44f5440369f8f9f1b41f49373)
---
 .../kahadb/MultiKahaDBPersistenceAdapter.java      |  10 +-
 .../store/kahadb/MultiKahaDBTransactionStore.java  |  23 ++-
 .../store/kahadb/disk/journal/Journal.java         |  11 +-
 .../activemq/bugs/MKahaDBTxRecoveryTest.java       | 224 +++++++++++++++++++++
 4 files changed, 262 insertions(+), 6 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index 4bdb8de..56e5e92 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -56,7 +56,6 @@ import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionIdTransformerAware;
 import org.apache.activemq.store.TransactionStore;
 import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.usage.StoreUsage;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
@@ -554,6 +553,15 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
         return transactionStore.getJournalMaxWriteBatchSize();
     }
 
+
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        transactionStore.setJournalCleanupInterval(journalCleanupInterval);
+    }
+
+    public long getJournalCleanupInterval() {
+        return transactionStore.getJournalCleanupInterval();
+    }
+
     public List<PersistenceAdapter> getAdapters() {
         return Collections.unmodifiableList(adapters);
     }
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index ff70076..5befa92 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -66,6 +66,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
     private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean recovered = new AtomicBoolean(false);
+    private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
 
     public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
         this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
@@ -188,6 +190,14 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         this.journalWriteBatchSize = journalWriteBatchSize;
     }
 
+    public void setJournalCleanupInterval(long journalCleanupInterval) {
+        this.journalCleanupInterval = journalCleanupInterval;
+    }
+
+    public long getJournalCleanupInterval() {
+        return journalCleanupInterval;
+    }
+
     public class Tx {
         private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
         private int prepareLocationId = 0;
@@ -308,14 +318,19 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             journal.setDirectory(getDirectory());
             journal.setMaxFileLength(journalMaxFileLength);
             journal.setWriteBatchSize(journalWriteBatchSize);
+            journal.setCleanupInterval(journalCleanupInterval);
             IOHelper.mkdirs(journal.getDirectory());
             journal.start();
             recoverPendingLocalTransactions();
+            recovered.set(true);
             store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
         }
     }
 
     private void txStoreCleanup() {
+        if (!recovered.get()) {
+            return;
+        }
         Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
         for (Tx tx : inflightTransactions.values()) {
             knownDataFileIds.remove(tx.getPreparedLocationId());
@@ -342,7 +357,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     private void recoverPendingLocalTransactions() throws IOException {
         Location location = journal.getNextLocation(null);
         while (location != null) {
-            process(load(location));
+            process(location, load(location));
             location = journal.getNextLocation(location);
         }
         recoveredPendingCommit.addAll(inflightTransactions.keySet());
@@ -361,11 +376,11 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         return message;
     }
 
-    public void process(JournalCommand<?> command) throws IOException {
+    public void process(final Location location, JournalCommand<?> command) throws IOException {
         switch (command.type()) {
             case KAHA_PREPARE_COMMAND:
                 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
-                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
+                getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
                 break;
             case KAHA_COMMIT_COMMAND:
                 KahaCommitCommand commitCommand = (KahaCommitCommand) command;
@@ -405,7 +420,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
                         if (recoveredPendingCommit.contains(txid)) {
                             LOG.info("delivering pending commit outcome for tid: " + txid);
                             broker.commitTransaction(null, txid, false);
-
+                            recoveredPendingCommit.remove(txid);
                         } else {
                             LOG.info("delivering rollback outcome to store for tid: " + txid);
                             broker.forgetTransaction(null, txid);
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 67d4c86..9a6e256 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -124,6 +124,14 @@ public class Journal {
         }
     }
 
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
     public enum PreallocationStrategy {
         SPARSE_FILE,
         OS_KERNEL_COPY,
@@ -230,6 +238,7 @@ public class Journal {
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
     private File osKernelCopyTemplateFile = null;
     private ByteBuffer preAllocateDirectBuffer = null;
+    private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
 
     protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
 
@@ -345,7 +354,7 @@ public class Journal {
             public void run() {
                 cleanup();
             }
-        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
+        }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
 
         long end = System.currentTimeMillis();
         LOG.trace("Startup took: "+(end-start)+" ms");
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
new file mode 100644
index 0000000..4a7e9c6
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class MKahaDBTxRecoveryTest {
+
+    static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
+    private final static int maxFileLength = 1024*1024*32;
+
+    private final static String PREFIX_DESTINATION_NAME = "queue";
+
+    private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test";
+    private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test";
+    private final static int CLEANUP_INTERVAL_MILLIS = 500;
+
+    BrokerService broker;
+    private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>();
+
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    @Test
+    public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
+
+        prepareBrokerWithMultiStore(true);
+        broker.start();
+        broker.waitUntilStarted();
+
+
+        // Ensure we have an Admin View.
+        assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+
+        final AtomicBoolean injectFailure = new AtomicBoolean(true);
+
+        final AtomicInteger reps = new AtomicInteger();
+        final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
+
+        TransactionIdTransformer faultInjector  = new TransactionIdTransformer() {
+            @Override
+            public TransactionId transform(TransactionId txid) {
+                if (injectFailure.get() && reps.incrementAndGet() > 5) {
+                    throw new RuntimeException("Bla");
+                }
+                return delegate.get().transform(txid);
+            }
+        };
+        // set up kahadb to fail after N ops
+        for (KahaDBPersistenceAdapter pa : kahadbs) {
+            if (delegate.get() == null) {
+                delegate.set(pa.getStore().getTransactionIdTransformer());
+            }
+            pa.setTransactionIdTransformer(faultInjector);
+        }
+
+        ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME  + "," + DESTINATION_NAME_2));
+        producer.send(s.createTextMessage("HI"));
+        try {
+            s.commit();
+        } catch (Exception expected) {
+            expected.printStackTrace();
+        }
+
+        assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
+        assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
+
+        final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+        final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+
+        assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
+            }
+        }));
+
+        // check completion on recovery
+        injectFailure.set(false);
+
+        // fire in many more local transactions to use N txStore journal files
+        for (int i=0; i<100; i++) {
+            producer.send(s.createTextMessage("HI"));
+            s.commit();
+        }
+
+        broker.stop();
+
+        // fail recovery processing on first attempt
+        prepareBrokerWithMultiStore(false);
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+
+            @Override
+            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+                // longer than CleanupInterval
+                TimeUnit.SECONDS.sleep( 2);
+                throw new RuntimeException("Sorry");
+            }
+        }});
+        broker.start();
+
+        // second recovery attempt should sort it
+        broker.stop();
+        prepareBrokerWithMultiStore(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+        // verify commit completed
+        Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+
+        destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+        assertEquals(101, destination.getMessageStore().getMessageCount());
+    }
+
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        kahadbs.add(kaha);
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
+        adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages));
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
+        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+
+	private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages)
+			throws IOException {
+		FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        if (destinationPrefix != null) {
+            template.setQueue(destinationPrefix + ".>");
+        }
+		return template;
+	}
+}
\ No newline at end of file


[activemq] 01/04: AMQ-7221 - Fix InMemoryJobSchedulerManagementTest that was broken after patch

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 8f8fda26ec5a0e1da1a59e1f87361dee1682b9e8
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Fri Jun 14 10:28:57 2019 -0400

    AMQ-7221 - Fix InMemoryJobSchedulerManagementTest that was broken after
    patch
    
    (cherry picked from commit 814a286dfe64bfde749e54b8552b5197abe97b37)
---
 .../activemq/broker/scheduler/JobSchedulerManagementTest.java | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index 5d80c05..b1f3b2e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -60,8 +60,10 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService brokerService = createBroker(true);
-        ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500);
-        ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100* 1024);
+        if (isPersistent()) {
+            ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500);
+            ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100 * 1024);
+        }
         return brokerService;
     }
 
@@ -102,7 +104,10 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
         // Now wait and see if any get delivered, none should.
         latch.await(10, TimeUnit.SECONDS);
         assertEquals(latch.getCount(), COUNT);
-        assertEquals(1, getNumberOfJournalFiles());
+
+        if (isPersistent()) {
+            assertEquals(1, getNumberOfJournalFiles());
+        }
 
         connection.close();
     }


[activemq] 04/04: AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 6ff79d85aae0068ede78927b7ea13d2783d9c767
Author: gtully <ga...@gmail.com>
AuthorDate: Tue Jun 11 12:36:02 2019 +0100

    AMQ-7225 - fix intermittent failure, avoid gc of partial tx pending commit
    
    (cherry picked from commit 28a0cc6e5a78adb4b0b0134c860911c921f6a074)
---
 .../activemq/store/kahadb/MultiKahaDBTransactionStore.java | 14 +++++++++-----
 .../org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java    |  2 +-
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 5befa92..d1b2d8e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -61,7 +61,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
     final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
     final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
-    final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
+    final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
     private Journal journal;
     private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
@@ -279,10 +279,12 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
 
     public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
         tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
+        pendingCommit.put(txid, tx);
     }
 
     public void persistCompletion(TransactionId txid) throws IOException {
         store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+        pendingCommit.remove(txid);
     }
 
     private Location store(JournalCommand<?> data) throws IOException {
@@ -335,6 +337,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
         for (Tx tx : inflightTransactions.values()) {
             knownDataFileIds.remove(tx.getPreparedLocationId());
         }
+        for (Tx tx : pendingCommit.values()) {
+            knownDataFileIds.remove(tx.getPreparedLocationId());
+        }
         try {
             journal.removeDataFiles(knownDataFileIds);
         } catch (Exception e) {
@@ -360,8 +365,8 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             process(location, load(location));
             location = journal.getNextLocation(location);
         }
-        recoveredPendingCommit.addAll(inflightTransactions.keySet());
-        LOG.info("pending local transactions: " + recoveredPendingCommit);
+        pendingCommit.putAll(inflightTransactions);
+        LOG.info("pending local transactions: " + pendingCommit.keySet());
     }
 
     public JournalCommand<?> load(Location location) throws IOException {
@@ -417,10 +422,9 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
             for (TransactionId txid : broker.getPreparedTransactions(null)) {
                 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
                     try {
-                        if (recoveredPendingCommit.contains(txid)) {
+                        if (pendingCommit.keySet().contains(txid)) {
                             LOG.info("delivering pending commit outcome for tid: " + txid);
                             broker.commitTransaction(null, txid, false);
-                            recoveredPendingCommit.remove(txid);
                         } else {
                             LOG.info("delivering rollback outcome to store for tid: " + txid);
                             broker.forgetTransaction(null, txid);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
index 4a7e9c6..da96431 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -207,7 +207,7 @@ public class MKahaDBTxRecoveryTest {
 
         multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
         multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
-        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+        multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10);
 
         broker = createBroker(multiKahaDBPersistenceAdapter);
     }


[activemq] 02/04: AMQ-7227 - ensure db.free file is moved in scheduler store upgrade process

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git

commit 996081ec2d4450c38887e47f65bfd9bd9107ab14
Author: gtully <ga...@gmail.com>
AuthorDate: Fri Jun 14 10:01:57 2019 +0100

    AMQ-7227 - ensure db.free file is moved in scheduler store upgrade process
    
    (cherry picked from commit 4d5e41ca284d884d0372b489dfa082e8f5d88cb0)
---
 .../apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 781ee21..cbaabe2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -318,7 +318,7 @@ public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSch
 
             @Override
             public boolean accept(File dir, String name) {
-                if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) {
+                if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log") || name.endsWith(".free")) {
                     return true;
                 }
                 return false;