You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/10/21 23:02:56 UTC

svn commit: r1187538 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Author: tabish
Date: Fri Oct 21 21:02:56 2011
New Revision: 1187538

URL: http://svn.apache.org/viewvc?rev=1187538&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3559

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1187538&r1=1187537&r2=1187538&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Oct 21 21:02:56 2011
@@ -445,6 +445,7 @@ public abstract class MessageDatabase ex
     }
 
     // public for testing
+    @SuppressWarnings("rawtypes")
     public Location getFirstInProgressTxLocation() {
         Location l = null;
         synchronized (inflightTransactions) {
@@ -854,7 +855,7 @@ public abstract class MessageDatabase ex
                 }
             }
 
-            if (!checkpointThread.isAlive()) {
+            if (checkpointThread == null || !checkpointThread.isAlive()) {
                 startCheckpoint();
             }
             return location;
@@ -967,6 +968,7 @@ public abstract class MessageDatabase ex
         });
     }
 
+    @SuppressWarnings("rawtypes")
     protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
             List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@@ -985,6 +987,7 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("rawtypes")
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@@ -1047,6 +1050,7 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("rawtypes")
     protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
         TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         List<Operation> inflightTx;
@@ -1077,6 +1081,7 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("rawtypes")
     protected void process(KahaPrepareCommand command, Location location) {
         TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         synchronized (inflightTransactions) {
@@ -1087,6 +1092,7 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("rawtypes")
     protected void process(KahaRollbackCommand command, Location location)  throws IOException {
         TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         List<Operation> updates = null;
@@ -1101,6 +1107,7 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("rawtypes")
     private void persistRedeliveryCount(List<Operation> updates)  throws IOException {
         if (updates != null) {
             for (Operation operation : updates) {
@@ -1557,7 +1564,7 @@ public abstract class MessageDatabase ex
             if (dataIn.readBoolean()) {
                 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
                 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
-                if (metadata.version >= 3) {
+                if (metadata.version >= VERSION) {
                     value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
                 } else {
                     // upgrade
@@ -1963,7 +1970,9 @@ public abstract class MessageDatabase ex
     // /////////////////////////////////////////////////////////////////
     // Transaction related implementation methods.
     // /////////////////////////////////////////////////////////////////
+    @SuppressWarnings("rawtypes")
     private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
+    @SuppressWarnings("rawtypes")
     protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
     protected final Set<String> ackedAndPrepared = new HashSet<String>();
 
@@ -1994,6 +2003,7 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("rawtypes")
     private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
         TransactionId key = TransactionIdConversion.convert(info);
         List<Operation> tx;