You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/06 19:19:32 UTC
svn commit: r741659 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Author: chirino
Date: Fri Feb 6 18:19:31 2009
New Revision: 741659
URL: http://svn.apache.org/viewvc?rev=741659&view=rev
Log:
- added some handy generic visitors to the BTreeVisitor class.
- Updated the recovery process so it now rollsback changes applied to the index which did not get synced to the journal.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Feb 6 18:19:31 2009
@@ -360,25 +360,64 @@
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
- if( recoveryPosition ==null ) {
- return;
+ if( recoveryPosition!=null ) {
+ int redoCounter = 0;
+ while (recoveryPosition != null) {
+ JournalCommand message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
+ process(message, recoveryPosition);
+ redoCounter++;
+ recoveryPosition = journal.getNextLocation(recoveryPosition);
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
-
- int redoCounter = 0;
- LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
-
- while (recoveryPosition != null) {
- JournalCommand message = load(recoveryPosition);
- metadata.lastUpdate = recoveryPosition;
- process(message, recoveryPosition);
- redoCounter++;
- recoveryPosition = journal.getNextLocation(recoveryPosition);
- }
- long end = System.currentTimeMillis();
- LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
+
+ // We may have to undo some index updates.
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ recoverIndex(tx);
+ }
+ });
}
}
+ protected void recoverIndex(Transaction tx) throws IOException {
+ long start = System.currentTimeMillis();
+ // It is possible index updates got applied before the journal updates..
+ // in that case we need to removed references to messages that are not in the journal
+ final Location lastAppendLocation = journal.getLastAppendLocation();
+ long undoCounter=0;
+
+ // Go through all the destinations to see if they have messages past the lastAppendLocation
+ for (StoredDestination sd : storedDestinations.values()) {
+
+ final ArrayList<Long> matches = new ArrayList<Long>();
+ // Find all the Locations that are >= than the last Append Location.
+ sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+ @Override
+ protected void matched(Location key, Long value) {
+ matches.add(value);
+ }
+ });
+
+
+ for (Long sequenceId : matches) {
+ MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ sd.locationIndex.remove(tx, keys.location);
+ sd.messageIdIndex.remove(tx, keys.messageId);
+ undoCounter++;
+ // TODO: do we need to modify the ack positions for the pub sub case?
+ }
+ }
+ long end = System.currentTimeMillis();
+ if( undoCounter > 0 ) {
+ // The rolledback operations are basically in flight journal writes. To avoid getting these the end user
+ // should do sync writes to the journal.
+ LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
+ }
+ }
+
private Location nextRecoveryPosition;
private Location lastRecoveryPosition;
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Fri Feb 6 18:19:31 2009
@@ -43,4 +43,96 @@
*/
void visit(List<Key> keys, List<Value> values);
+
+ abstract class GTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public GTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return second==null || second.compareTo(value)>0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)>0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
+
+ abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public GTEVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return second==null || second.compareTo(value)>=0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)>=0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
+
+ abstract class LTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public LTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return first==null || first.compareTo(value)<0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)<0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
+
+ abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public LTEVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second) {
+ return first==null || first.compareTo(value)<=0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)<=0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
}
\ No newline at end of file