You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/10/21 23:02:56 UTC
svn commit: r1187538 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Author: tabish
Date: Fri Oct 21 21:02:56 2011
New Revision: 1187538
URL: http://svn.apache.org/viewvc?rev=1187538&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3559
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1187538&r1=1187537&r2=1187538&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Oct 21 21:02:56 2011
@@ -445,6 +445,7 @@ public abstract class MessageDatabase ex
}
// public for testing
+ @SuppressWarnings("rawtypes")
public Location getFirstInProgressTxLocation() {
Location l = null;
synchronized (inflightTransactions) {
@@ -854,7 +855,7 @@ public abstract class MessageDatabase ex
}
}
- if (!checkpointThread.isAlive()) {
+ if (checkpointThread == null || !checkpointThread.isAlive()) {
startCheckpoint();
}
return location;
@@ -967,6 +968,7 @@ public abstract class MessageDatabase ex
});
}
+ @SuppressWarnings("rawtypes")
protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@@ -985,6 +987,7 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("rawtypes")
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
List<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
@@ -1047,6 +1050,7 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("rawtypes")
protected void process(KahaCommitCommand command, Location location, final Runnable after) throws IOException {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
List<Operation> inflightTx;
@@ -1077,6 +1081,7 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("rawtypes")
protected void process(KahaPrepareCommand command, Location location) {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
synchronized (inflightTransactions) {
@@ -1087,6 +1092,7 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("rawtypes")
protected void process(KahaRollbackCommand command, Location location) throws IOException {
TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
List<Operation> updates = null;
@@ -1101,6 +1107,7 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("rawtypes")
private void persistRedeliveryCount(List<Operation> updates) throws IOException {
if (updates != null) {
for (Operation operation : updates) {
@@ -1557,7 +1564,7 @@ public abstract class MessageDatabase ex
if (dataIn.readBoolean()) {
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
- if (metadata.version >= 3) {
+ if (metadata.version >= VERSION) {
value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
} else {
// upgrade
@@ -1963,7 +1970,9 @@ public abstract class MessageDatabase ex
// /////////////////////////////////////////////////////////////////
// Transaction related implementation methods.
// /////////////////////////////////////////////////////////////////
+ @SuppressWarnings("rawtypes")
private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
+ @SuppressWarnings("rawtypes")
protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
protected final Set<String> ackedAndPrepared = new HashSet<String>();
@@ -1994,6 +2003,7 @@ public abstract class MessageDatabase ex
}
}
+ @SuppressWarnings("rawtypes")
private List<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
TransactionId key = TransactionIdConversion.convert(info);
List<Operation> tx;