You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2018/04/27 12:49:13 UTC

activemq git commit: AMQ-6707 - mKahadb, track recovered tx per store for completion, resolve test regression

Repository: activemq
Updated Branches:
  refs/heads/master ea70e827c -> ceb97f6ba


AMQ-6707 - mKahadb, track recovered tx per store for completion, resolve test regression


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ceb97f6b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ceb97f6b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ceb97f6b

Branch: refs/heads/master
Commit: ceb97f6baa471006cf176f298e763224f8d1b58f
Parents: ea70e82
Author: gtully <ga...@gmail.com>
Authored: Fri Apr 27 13:48:51 2018 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Apr 27 13:48:51 2018 +0100

----------------------------------------------------------------------
 .../kahadb/MultiKahaDBTransactionStore.java     | 36 +++++++++++++++-----
 1 file changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ceb97f6b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index ff70076..f13fc53 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -19,7 +19,9 @@ package org.apache.activemq.store.kahadb;
 import java.io.File;
 import java.io.IOException;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -189,17 +191,25 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     }
 
     public class Tx {
-        private final Set<TransactionStore> stores = new HashSet<TransactionStore>();
+        private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
         private int prepareLocationId = 0;
 
+        public void trackStore(TransactionStore store, XATransactionId xid) {
+            stores.put(store, xid);
+        }
+
         public void trackStore(TransactionStore store) {
-            stores.add(store);
+            stores.put(store, null);
         }
 
-        public Set<TransactionStore> getStores() {
+        public HashMap<TransactionStore, TransactionId> getStoresMap() {
             return stores;
         }
 
+        public Set<TransactionStore> getStores() {
+            return stores.keySet();
+        }
+
         public void trackPrepareLocation(Location location) {
             this.prepareLocationId = location.getDataFileId();
         }
@@ -240,8 +250,13 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
 
         Tx tx = getTx(txid);
         if (wasPrepared) {
-            for (TransactionStore store : tx.getStores()) {
-                store.commit(txid, true, null, null);
+            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
+                TransactionId recovered = storeTx.getValue();
+                if (recovered != null) {
+                    storeTx.getKey().commit(recovered, true, null, null);
+                } else {
+                    storeTx.getKey().commit(txid, true, null, null);
+                }
             }
         } else {
             // can only do 1pc on a single store
@@ -289,8 +304,13 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
     public void rollback(TransactionId txid) throws IOException {
         Tx tx = removeTx(txid);
         if (tx != null) {
-            for (TransactionStore store : tx.getStores()) {
-                store.rollback(txid);
+            for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) {
+                TransactionId recovered = storeTx.getValue();
+                if (recovered != null) {
+                    storeTx.getKey().rollback(recovered);
+                } else {
+                    storeTx.getKey().rollback(txid);
+                }
             }
         }
     }
@@ -387,7 +407,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
                 @Override
                 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
                     try {
-                        getTx(xid).trackStore(adapter.createTransactionStore());
+                        getTx(xid).trackStore(adapter.createTransactionStore(), xid);
                     } catch (IOException e) {
                         LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e);
                     }