You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/31 15:07:57 UTC

svn commit: r1163613 - in /activemq/trunk: activemq-camel/src/test/java/org/apache/activemq/camel/ activemq-camel/src/test/resources/org/apache/activemq/camel/ activemq-core/ activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/s...

Author: gtully
Date: Wed Aug 31 13:07:57 2011
New Revision: 1163613

URL: http://svn.apache.org/viewvc?rev=1163613&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-2868 - rework to remove sync on transaction completion, cursor updates are now stacked so that they ocurr in order, independent of thread execution after waiting for the journal to complete a write. This ensures that the cursors are updates in the same order as the index while still working wo the index lock. TransactedConsumerTest shows horizontal scaling now works better with transactions. Reworked metadata.lastUpdate to always work with the existing index lock rather than reaquire, this may help with spurious gc journal data file issue on windows - https://issues.apache.org/jira/browse/AMQ-3470

Modified:
    activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
    activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java

Modified: activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java (original)
+++ activemq/trunk/activemq-camel/src/test/java/org/apache/activemq/camel/TransactedConsumeTest.java Wed Aug 31 13:07:57 2011
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.camel;
 
-import java.io.File;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.jms.Connection;
 import javax.jms.MessageProducer;
@@ -28,7 +27,7 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.store.amq.AMQPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.Wait;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -88,9 +87,12 @@ public class TransactedConsumeTest exten
 
         brokerService.setAdvisorySupport(false);
         brokerService.setDataDirectory("target/data");
-        AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
-        amq.setDirectory(new File("target/data"));
-        brokerService.setPersistenceAdapter(amq);
+        //AMQPersistenceAdapter amq = new AMQPersistenceAdapter();
+        //amq.setDirectory(new File("target/data"));
+        //brokerService.setPersistenceAdapter(amq);
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)
+                brokerService.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
         brokerService.addConnector("tcp://localhost:61616");
         return brokerService;
     }

Modified: activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml (original)
+++ activemq/trunk/activemq-camel/src/test/resources/org/apache/activemq/camel/transactedconsume.xml Wed Aug 31 13:07:57 2011
@@ -28,22 +28,17 @@
     <context:annotation-config/>
 
     <bean id="vhfBatchListenerJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
-        <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1000"/>
+        <property name="brokerURL" value="tcp://localhost:61616?jms.prefetchPolicy.all=1"/>
     </bean>
 
     <bean id="vhfBatchListenerPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
-        <!-- match maxConnections to the number of routes that share the  connection factory -->
-        <property name="maxConnections" value="2"/>
-        <!-- match maximumActive (which is active sessions) to num routes *  concurrentConsumers in the MLC -->
-        <property name="maximumActive" value="20"/>
+        <!-- match maxConnections to the number of routes that share the connection factory -->
+        <property name="maxConnections" value="10"/>
+        <!-- match maximumActive (which is active sessions) >=  concurrentConsumers in the MLC -->
+        <property name="maximumActive" value="1"/>
         <property name="connectionFactory" ref="vhfBatchListenerJMSConnectionFactory"/>
     </bean>
 
-    <!-- bean id="vhfBatchListenerSingleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
-       <property name="reconnectOnException" value="true" />
-       <property name="targetConnectionFactory" ref="vhfBatchListenerJMSConnectionFactory" />
-   </bean -->
-
     <!-- JMS Transaction manager -->
     <bean id="vhfBatchListenerJMSTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
         <property name="connectionFactory" ref="vhfBatchListenerPooledConnectionFactory"/>
@@ -53,9 +48,8 @@
     <bean id="vhfBatchListenerJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
         <property name="connectionFactory" ref="vhfBatchListenerPooledConnectionFactory"/>
         <property name="transactionManager" ref="vhfBatchListenerJMSTransactionManager"/>
-        <property name="receiveTimeout" value="20000" />
         <property name="transacted" value="true"/>
-        <property name="concurrentConsumers" value="10"/>
+        <property name="concurrentConsumers" value="1"/>
         <property name="cacheLevelName" value="CACHE_CONSUMER"/>
     </bean>
 
@@ -72,6 +66,30 @@
     <bean id="activemq2" class="org.apache.activemq.camel.component.ActiveMQComponent">
         <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
     </bean>
+    <bean id="activemq3" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq4" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq5" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq6" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq7" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq8" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq9" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
+    <bean id="activemq10" class="org.apache.activemq.camel.component.ActiveMQComponent">
+        <property name="configuration" ref="vhfBatchListenerJMSConfig"/>
+    </bean>
 
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <route>
@@ -79,11 +97,44 @@
             <process ref="connectionLog"/>
         </route>
 
-        <!-- better through put with a second route/connection once shared pool config matches concurrentConsumers -->
+        <!-- better through put with a additional route/connection once shared pool config matches concurrentConsumers -->
         <route>
             <from uri="activemq2:queue:scp_transacted"/>
             <process ref="connectionLog"/>
         </route>
+        <route>
+            <from uri="activemq3:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq4:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq5:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq6:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq7:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq8:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq9:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+        <route>
+            <from uri="activemq10:queue:scp_transacted"/>
+            <process ref="connectionLog"/>
+        </route>
+
     </camelContext>
 
     <bean id="connectionLog" class="org.apache.activemq.camel.TransactedConsumeTest.ConnectionLog"/>

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Aug 31 13:07:57 2011
@@ -463,7 +463,6 @@
             <exclude>**/QuickJournalRecoveryBrokerTest.*</exclude>
             <exclude>**/QuickJournalXARecoveryBrokerTest.*</exclude>
             <exclude>**/RendezvousDiscoverTransportTest.*</exclude>
-            <exclude>**/MissingDataFileTest.*</exclude>
 
             <!-- m2 tests failing since move from assembly  -->
             <exclude>**/QueueConsumerCloseAndReconnectTest.*</exclude>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java Wed Aug 31 13:07:57 2011
@@ -286,13 +286,10 @@ public class KahaDBTransactionStore impl
 
             } else {
                 KahaTransactionInfo info = getTransactionInfo(txid);
-                // ensure message order w.r.t to cursor and store for setBatch()
-                synchronized (this) {
                     for (Journal journal : theStore.getJournalManager().getJournals()) {
                         theStore.store(journal, new KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
                     }
                     forgetRecoveredAcks(txid);
-                }
             }
         } else {
             LOG.error("Null transaction passed on commit");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Aug 31 13:07:57 2011
@@ -32,7 +32,6 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
@@ -678,7 +677,7 @@ public class MessageDatabase extends Ser
                 lastRecoveryPosition = nextRecoveryPosition;
                 metadata.lastUpdate = lastRecoveryPosition;
                 JournalCommand<?> message = load(journal, lastRecoveryPosition);
-                process(message, lastRecoveryPosition);
+                process(message, lastRecoveryPosition, (Runnable)null);
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
         } finally {
@@ -779,24 +778,30 @@ public class MessageDatabase extends Ser
             long start = System.currentTimeMillis();
             Location location = journal.write(os.toByteSequence(), sync);
             long start2 = System.currentTimeMillis();
-            process(data, location);
+            process(data, location, after);
             long end = System.currentTimeMillis();
             if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
                 LOG.info("Slow KahaDB access: Journal append took: " + (start2 - start) + " ms, Index update took " + (end - start2) + " ms");
             }
 
-            this.indexLock.writeLock().lock();
-            try {
-                metadata.lastUpdate = location;
-            } finally {
-                this.indexLock.writeLock().unlock();
+            if (after != null) {
+                Runnable afterCompletion = null;
+                synchronized (orderedTransactionAfters) {
+                    if (!orderedTransactionAfters.empty()) {
+                        afterCompletion = orderedTransactionAfters.pop();
+                    }
+                }
+                if (afterCompletion != null) {
+                    afterCompletion.run();
+                } else {
+                    // non persistent message case
+                    after.run();
+                }
             }
+
             if (!checkpointThread.isAlive()) {
                 startCheckpoint();
             }
-            if (after != null) {
-                after.run();
-            }
             return location;
         } catch (IOException ioe) {
             LOG.error("KahaDB failed to store to Journal", ioe);
@@ -831,7 +836,7 @@ public class MessageDatabase extends Ser
      */
     void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
-            process(data, location);
+            process(data, location, (Runnable) null);
         } else {
             // just recover producer audit
             data.visit(new Visitor() {
@@ -848,7 +853,7 @@ public class MessageDatabase extends Ser
     // from the recovery method too so they need to be idempotent
     // /////////////////////////////////////////////////////////////////
 
-    void process(JournalCommand<?> data, final Location location) throws IOException {
+    void process(JournalCommand<?> data, final Location location, final Runnable after) throws IOException {
         data.visit(new Visitor() {
             @Override
             public void visit(KahaAddMessageCommand command) throws IOException {
@@ -867,7 +872,7 @@ public class MessageDatabase extends Ser
 
             @Override
             public void visit(KahaCommitCommand command) throws IOException {
-                process(command, location);
+                process(command, location, after);
             }
 
             @Override
@@ -884,6 +889,16 @@ public class MessageDatabase extends Ser
             public void visit(KahaSubscriptionCommand command) throws IOException {
                 process(command, location);
             }
+
+            @Override
+            public void visit(KahaProducerAuditCommand command) throws IOException {
+                processLocation(location);
+            }
+
+            @Override
+            public void visit(KahaTraceCommand command) {
+                processLocation(location);
+            }
         });
     }
 
@@ -950,7 +965,25 @@ public class MessageDatabase extends Ser
         }
     }
 
-    protected void process(KahaCommitCommand command, Location location) throws IOException {
+    protected void processLocation(final Location location) {
+        this.indexLock.writeLock().lock();
+        try {
+            metadata.lastUpdate = location;
+        } finally {
+            this.indexLock.writeLock().unlock();
+        }
+    }
+
+    private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
+    private void push(Runnable after) {
+        if (after != null) {
+            synchronized (orderedTransactionAfters) {
+                orderedTransactionAfters.push(after);
+            }
+        }
+    }
+
+    protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
         TransactionId key = key(command.getTransactionInfo());
         List<Operation> inflightTx;
         synchronized (inflightTransactions) {
@@ -973,6 +1006,8 @@ public class MessageDatabase extends Ser
                     }
                 }
             });
+            metadata.lastUpdate = location;
+            push(after);
         } finally {
             this.indexLock.writeLock().unlock();
         }
@@ -1046,6 +1081,7 @@ public class MessageDatabase extends Ser
         }
         // record this id in any event, initial send or recovery
         metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
+        metadata.lastUpdate = location;
     }
 
     void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
@@ -1079,6 +1115,7 @@ public class MessageDatabase extends Ser
             }
 
         }
+        metadata.lastUpdate = ackLocation;
     }
 
     Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
@@ -2224,6 +2261,7 @@ public class MessageDatabase extends Ser
                         cursor.lowPriorityCursorPosition = nextPosition.longValue();
                     }
                 } else {
+                    LOG.warn("setBatch: sequence " + sequence + " not found in orderindex:" + this);
                     lastDefaultKey = sequence;
                     cursor.defaultCursorPosition = nextPosition.longValue();
                 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java?rev=1163613&r1=1163612&r2=1163613&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java Wed Aug 31 13:07:57 2011
@@ -28,6 +28,7 @@ import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ public class MissingDataFileTest extends
 
     private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class);
     
-    private static int counter = 300;
+    private static int counter = 500;
 
     private static int hectorToHaloCtr;
     private static int xenaToHaloCtr;
@@ -94,12 +95,13 @@ public class MissingDataFileTest extends
    
         SystemUsage systemUsage;
         systemUsage = new SystemUsage();
-        systemUsage.getMemoryUsage().setLimit(1024 * 1024); // Just a few messags 
+        systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags
         broker.setSystemUsage(systemUsage);
         
-        AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
-        factory.setMaxFileLength(2*1024); // ~4 messages
-        factory.setCleanupInterval(1000); // every few second
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+        kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+        kahaDBPersistenceAdapter.setCleanupInterval(500);
+        broker.setPersistenceAdapter(kahaDBPersistenceAdapter);
         
         broker.start();
         LOG.info("Starting broker..");