You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/02/06 19:19:32 UTC

svn commit: r741659 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java

Author: chirino
Date: Fri Feb  6 18:19:31 2009
New Revision: 741659

URL: http://svn.apache.org/viewvc?rev=741659&view=rev
Log:
- added some handy generic visitors to the BTreeVisitor class.
- Updated the recovery process so it now rollsback changes applied to the index which did not get synced to the journal.


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Feb  6 18:19:31 2009
@@ -360,25 +360,64 @@
 	        long start = System.currentTimeMillis();
 	        
 	        Location recoveryPosition = getRecoveryPosition();
-	        if( recoveryPosition ==null ) {
-	        	return;
+	        if( recoveryPosition!=null ) {
+		        int redoCounter = 0;
+		        while (recoveryPosition != null) {
+		            JournalCommand message = load(recoveryPosition);
+		            metadata.lastUpdate = recoveryPosition;
+		            process(message, recoveryPosition);
+		            redoCounter++;
+		            recoveryPosition = journal.getNextLocation(recoveryPosition);
+		        }
+		        long end = System.currentTimeMillis();
+	        	LOG.info("Replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
 	        }
-	        
-	        int redoCounter = 0;
-	        LOG.info("Journal Recovery Started from: " + journal + " at " + recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
-	
-	        while (recoveryPosition != null) {
-	            JournalCommand message = load(recoveryPosition);
-	            metadata.lastUpdate = recoveryPosition;
-	            process(message, recoveryPosition);
-	            redoCounter++;
-	            recoveryPosition = journal.getNextLocation(recoveryPosition);
-	        }
-	        long end = System.currentTimeMillis();
-	        LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
+	     
+	        // We may have to undo some index updates.
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    recoverIndex(tx);
+                }
+            });
         }
     }
     
+	protected void recoverIndex(Transaction tx) throws IOException {
+        long start = System.currentTimeMillis();
+        // It is possible index updates got applied before the journal updates.. 
+        // in that case we need to removed references to messages that are not in the journal
+        final Location lastAppendLocation = journal.getLastAppendLocation();
+        long undoCounter=0;
+        
+        // Go through all the destinations to see if they have messages past the lastAppendLocation
+        for (StoredDestination sd : storedDestinations.values()) {
+        	
+            final ArrayList<Long> matches = new ArrayList<Long>();
+            // Find all the Locations that are >= than the last Append Location.
+            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+				@Override
+				protected void matched(Location key, Long value) {
+					matches.add(value);
+				}
+            });
+            
+            
+            for (Long sequenceId : matches) {
+                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+                sd.locationIndex.remove(tx, keys.location);
+                sd.messageIdIndex.remove(tx, keys.messageId);
+                undoCounter++;
+                // TODO: do we need to modify the ack positions for the pub sub case?
+			}
+        }
+        long end = System.currentTimeMillis();
+        if( undoCounter > 0 ) {
+        	// The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
+        	// should do sync writes to the journal.
+	        LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
+        }
+	}
+
 	private Location nextRecoveryPosition;
 	private Location lastRecoveryPosition;
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Fri Feb  6 18:19:31 2009
@@ -43,4 +43,96 @@
      */
     void visit(List<Key> keys, List<Value> values);
     
+    
+    abstract class GTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+		final private Key value;
+
+		public GTVisitor(Key value) {
+			this.value = value;
+		}
+
+		public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return second==null || second.compareTo(value)>0;
+		}
+
+		public void visit(List<Key> keys, List<Value> values) {
+			for( int i=0; i < keys.size(); i++) {
+				Key key = keys.get(i);
+				if( key.compareTo(value)>0 ) {
+					matched(key, values.get(i));
+				}
+			}
+		}
+
+		abstract protected void matched(Key key, Value value);
+    }
+    
+    abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+		final private Key value;
+
+		public GTEVisitor(Key value) {
+			this.value = value;
+		}
+
+		public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return second==null || second.compareTo(value)>=0;
+		}
+
+		public void visit(List<Key> keys, List<Value> values) {
+			for( int i=0; i < keys.size(); i++) {
+				Key key = keys.get(i);
+				if( key.compareTo(value)>=0 ) {
+					matched(key, values.get(i));
+				}
+			}
+		}
+
+		abstract protected void matched(Key key, Value value);
+    }
+    
+    abstract class LTVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+		final private Key value;
+
+		public LTVisitor(Key value) {
+			this.value = value;
+		}
+
+		public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return first==null || first.compareTo(value)<0;
+		}
+
+		public void visit(List<Key> keys, List<Value> values) {
+			for( int i=0; i < keys.size(); i++) {
+				Key key = keys.get(i);
+				if( key.compareTo(value)<0 ) {
+					matched(key, values.get(i));
+				}
+			}
+		}
+
+		abstract protected void matched(Key key, Value value);
+    }
+    
+    abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements BTreeVisitor<Key, Value>{
+		final private Key value;
+
+		public LTEVisitor(Key value) {
+			this.value = value;
+		}
+
+		public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return first==null || first.compareTo(value)<=0;
+		}
+
+		public void visit(List<Key> keys, List<Value> values) {
+			for( int i=0; i < keys.size(); i++) {
+				Key key = keys.get(i);
+				if( key.compareTo(value)<=0 ) {
+					matched(key, values.get(i));
+				}
+			}
+		}
+
+		abstract protected void matched(Key key, Value value);
+    }
 }
\ No newline at end of file