You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/06/22 22:43:25 UTC

svn commit: r1353024 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ main/java/org/apache/activemq/transaction/ test/java/org/apac...

Author: gtully
Date: Fri Jun 22 20:43:24 2012
New Revision: 1353024

URL: http://svn.apache.org/viewvc?rev=1353024&view=rev
Log:
fix up destination statistics for recovered transactions, pending adds are not visible, but pending acks are still accounted for in the messages count, commit/rollback updates enqueues/dequeues/messages as expected - https://issues.apache.org/jira/browse/AMQ-3872, https://issues.apache.org/jira/browse/AMQ-3305 - both kahadb and jdbc suffered

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransactionBroker.java Fri Jun 22 20:43:24 2012
@@ -30,7 +30,6 @@ import javax.transaction.xa.XAException;
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.jmx.ManagedRegionBroker;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.BaseCommand;
 import org.apache.activemq.command.ConnectionInfo;
@@ -139,19 +138,28 @@ public class TransactionBroker extends B
     private void registerSync(Destination destination, Transaction transaction, BaseCommand command) {
         Synchronization sync = new PreparedDestinationCompletion(destination, command.isMessage());
         // ensure one per destination in the list
-        transaction.removeSynchronization(sync);
-        transaction.addSynchronization(sync);
+        Synchronization existing = transaction.findMatching(sync);
+        if (existing != null) {
+           ((PreparedDestinationCompletion)existing).incrementOpCount();
+        } else {
+            transaction.addSynchronization(sync);
+        }
     }
 
     static class PreparedDestinationCompletion extends Synchronization {
         final Destination destination;
         final boolean messageSend;
+        int opCount = 1;
         public PreparedDestinationCompletion(final Destination destination, boolean messageSend) {
             this.destination = destination;
             // rollback relevant to acks, commit to sends
             this.messageSend = messageSend;
         }
 
+        public void incrementOpCount() {
+            opCount++;
+        }
+
         @Override
         public int hashCode() {
             return System.identityHashCode(destination) +
@@ -179,9 +187,14 @@ public class TransactionBroker extends B
         public void afterCommit() throws Exception {
             if (messageSend) {
                 destination.clearPendingMessages();
+                destination.getDestinationStatistics().getEnqueues().add(opCount);
+                destination.getDestinationStatistics().getMessages().add(opCount);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("cleared pending from afterCommit : " + destination);
                 }
+            } else {
+                destination.getDestinationStatistics().getDequeues().add(opCount);
+                destination.getDestinationStatistics().getMessages().subtract(opCount);
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java Fri Jun 22 20:43:24 2012
@@ -17,16 +17,10 @@
 package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
-import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -37,11 +31,9 @@ import org.apache.activemq.store.Message
 import org.apache.activemq.store.ProxyTopicMessageStore;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionRecoveryListener;
-import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
 import org.apache.activemq.store.memory.MemoryTransactionStore;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.SubscriptionKey;
 
 /**
  * respect 2pc prepare
@@ -305,6 +297,11 @@ public class JdbcMemoryTransactionStore 
                 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) topicStores.get(lastAckCommand.getMessageAck().getDestination());
                 jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
                 lastAckCommand.setMessageStore(jdbcTopicMessageStore);
+            } else {
+                // when reading the store we ignore messages with non null XIDs but should include those with XIDS starting in - (pending acks in an xa transaction),
+                // but the sql is non portable to match BLOB with LIKE etc
+                // so we make up for it when we recover the ack
+                ((JDBCPersistenceAdapter)persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
             }
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Jun 22 20:43:24 2012
@@ -354,6 +354,7 @@ public class DefaultJDBCAdapter implemen
                 s.setLong(1, seq);
             } else {
                 byte[] xidVal = xid.getEncodedXidBytes();
+                xidVal[0] = '-';
                 setBinaryData(s, 1, xidVal);
                 s.setLong(2, seq);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transaction/Transaction.java Fri Jun 22 20:43:24 2012
@@ -71,6 +71,14 @@ public abstract class Transaction {
         }
     }
 
+    public Synchronization findMatching(Synchronization r) {
+        int existing = synchronizations.indexOf(r);
+        if (existing != -1) {
+            return synchronizations.get(existing);
+        }
+        return null;
+    }
+
     public void removeSynchronization(Synchronization r) {
         synchronizations.remove(r);
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1353024&r1=1353023&r2=1353024&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Fri Jun 22 20:43:24 2012
@@ -108,11 +108,13 @@ public class XARecoveryBrokerTest extend
         assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());
 
         TransactionId first = (TransactionId)dar.getData()[0];
+        int commitCount = 0;
         // via jmx, force outcome
         for (int i = 0; i < 4; i++) {
             RecoveredXATransactionViewMBean mbean =  getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
             if (i%2==0) {
                 mbean.heuristicCommit();
+                commitCount++;
             } else {
                 mbean.heuristicRollback();
             }
@@ -124,6 +126,9 @@ public class XARecoveryBrokerTest extend
         dar = (DataArrayResponse)response;
         assertEquals(0, dar.getData().length);
 
+        // verify messages available
+        assertEquals("enqueue count reflects outcome", commitCount, destinationView.getQueueSize());
+
         // verify mbeans gone
         try {
             RecoveredXATransactionViewMBean gone = getProxyToPreparedTransactionViewMBean(first);
@@ -547,11 +552,20 @@ public class XARecoveryBrokerTest extend
         assertNull(m);
         assertNoMessagesLeft(connection);
 
+        // validate destination depth via jmx
+        DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
+        assertEquals("enqueue count does not see prepared acks", 4, destinationView.getQueueSize());
+        assertEquals("enqueue count does not see prepared acks", 0, destinationView.getDequeueCount());
+
         connection.request(createCommitTransaction2Phase(connectionInfo, txid));
 
         // validate recovery complete
         dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
         assertEquals("there are no prepared tx", 0, dataArrayResponse.getData().length);
+
+        assertEquals("enqueue count does not see commited acks", 0, destinationView.getQueueSize());
+        assertEquals("enqueue count does not see commited acks", 4, destinationView.getDequeueCount());
+
     }
 
     public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {