You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2018/10/10 14:23:44 UTC

[2/3] activemq git commit: AMQ-7067 - tidy up tests and add prepare variant, limit rollback location recording to xa case. There is still some work to do for the ack compaction case to make it aware of the tx records such that those are transferred as ne

AMQ-7067 - tidy up tests and add prepare variant, limit rollback location recording to xa case. There is still some work to do for the ack compaction case to make it aware of the tx records such that those are transferred as necessary

(cherry picked from commit 57c7939534a927bfc2d1b0454aac7ef8d804532b)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7fa85185
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7fa85185
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7fa85185

Branch: refs/heads/activemq-5.15.x
Commit: 7fa85185aae02414ff023727efc46680f5ead66a
Parents: cbe486f
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 9 12:01:47 2018 +0100
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Oct 10 10:23:21 2018 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |   2 +-
 .../org/apache/activemq/bugs/AMQ7067Test.java   | 128 ++++++++++++++-----
 2 files changed, 99 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7fa85185/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 82c4865..86dfcac 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1437,7 +1437,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
                 updates = preparedTransactions.remove(key);
             }
         }
-        if (updates != null) {
+        if (key.isXATransaction() && updates != null) {
             for(Operation op : updates) {
                 recordAckMessageReferenceLocation(location, op.getLocation());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7fa85185/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
index c1f34d0..d00ee41 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -14,12 +14,14 @@ import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import javax.jms.*;
+import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.transaction.xa.XAException;
@@ -32,11 +34,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
-import java.net.URI;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import static javax.transaction.xa.XAResource.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class AMQ7067Test {
 
@@ -82,11 +87,55 @@ public class AMQ7067Test {
         broker.start();
     }
 
+    @Test
+    public void testXAPrepare() throws Exception {
+
+        setupXAConnection();
+
+        Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
+
+        MessageProducer holdKahaDbProducer = xaSession.createProducer(holdKahaDb);
+
+        XATransactionId txid = createXATransaction();
+        System.out.println("****** create new txid = " + txid);
+        xaRes.start(txid, TMNOFLAGS);
+
+        TextMessage helloMessage = xaSession.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        xaRes.end(txid, TMSUCCESS);
+
+        Queue queue = xaSession.createQueue("test");
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        xaRes.prepare(txid);
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        ((org.apache.activemq.broker.region.Queue) broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        Xid[] xids = xaRes.recover(TMSTARTRSCAN);
+
+        //Should be 1 since we have only 1 prepared
+        assertEquals(1, xids.length);
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+
+        setupXAConnection();
+        xids = xaRes.recover(TMSTARTRSCAN);
+
+        System.out.println("****** recovered = " + xids);
+
+        // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
+        assertEquals(1, xids.length);
+    }
 
     @Test
-    public void testAMQ7067XAcommit() throws Exception {
+    public void testXAcommit() throws Exception {
 
-        PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
         setupXAConnection();
 
         Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
@@ -124,15 +173,14 @@ public class AMQ7067Test {
         setupXAConnection();
         xids = xaRes.recover(TMSTARTRSCAN);
 
-        // THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION!
+        // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
         assertEquals(1, xids.length);
 
     }
 
     @Test
-    public void testAMQ7067XArollback() throws Exception {
+    public void testXArollback() throws Exception {
 
-        PersistenceAdapterViewMBean kahadbView = getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
         setupXAConnection();
 
         Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
@@ -176,11 +224,11 @@ public class AMQ7067Test {
     }
 
     @Test
-    public void testAMQ7067commit() throws Exception {
+    public void testCommit() throws Exception {
         final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
         connection.start();
 
-        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue holdKahaDb = session.createQueue("holdKahaDb");
         MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
         TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
@@ -192,14 +240,28 @@ public class AMQ7067Test {
 
         System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
         purgeQueue(queue.getQueueName());
-        Thread.sleep(10000);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        // force gc
+        broker.getPersistenceAdapter().checkpoint(true);
 
+
+        connection.close();
         curruptIndexFile(getDataDirectory());
 
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+        broker.waitUntilStarted();
 
         while(true) {
             try {
-                Thread.sleep(10000);
+                TimeUnit.SECONDS.sleep(1);
                 System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
                 break;
             } catch (Exception ex) {
@@ -208,18 +270,16 @@ public class AMQ7067Test {
             }
         }
 
-        connection.close();
-
         // THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION!
         assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
     }
 
     @Test
-    public void testAMQ7067rollback() throws Exception {
+    public void testRollback() throws Exception {
         final Connection connection = ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
         connection.start();
 
-        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         Queue holdKahaDb = session.createQueue("holdKahaDb");
         MessageProducer holdKahaDbProducer = session.createProducer(holdKahaDb);
         TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", 10));
@@ -231,26 +291,34 @@ public class AMQ7067Test {
 
         System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
         purgeQueue(queue.getQueueName());
-        Thread.sleep(10000);
-
-        curruptIndexFile(getDataDirectory());
 
-
-        while(true) {
-            try {
-                Thread.sleep(10000);
-                System.out.println(String.format("QueueSize %s: %d", holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
-                break;
-            } catch (Exception ex) {
-                System.out.println(ex.getMessage());
-                break;
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
             }
-        }
+        });
+
+        // force gc
+        broker.getPersistenceAdapter().checkpoint(true);
 
         connection.close();
+        curruptIndexFile(getDataDirectory());
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+        broker.waitUntilStarted();
 
-        // THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION!
-        assertEquals(0, getQueueSize(holdKahaDb.getQueueName()));
+
+        // no sign of the test queue on recovery, rollback is the default for any inflight
+        // this test serves as a sanity check on existing behaviour
+        try {
+            getQueueSize(holdKahaDb.getQueueName());
+            fail("expect InstanceNotFoundException");
+        } catch (UndeclaredThrowableException expected) {
+            assertTrue(expected.getCause() instanceof InstanceNotFoundException);
+        }
     }
 
     protected static void createDanglingTransaction(XAResource xaRes, XASession xaSession, Queue queue) throws JMSException, IOException, XAException {
@@ -281,7 +349,7 @@ public class AMQ7067Test {
     }
 
     protected static void produce(Connection connection, Queue queue, int messageCount, int messageSize) throws JMSException, IOException, XAException {
-        Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(queue);
 
         for (int i = 0; i < messageCount; i++) {