You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/19 16:23:02 UTC

svn commit: r1172640 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Author: tabish
Date: Mon Sep 19 14:23:02 2011
New Revision: 1172640

URL: http://svn.apache.org/viewvc?rev=1172640&view=rev
Log:
fixes for https://issues.apache.org/jira/browse/AMQ-3467 

Clean up some code in MessageDatabase, add guards around logging to avoid big string ops when not
needed.  


Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1172640&r1=1172639&r2=1172640&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Sep 19 14:23:02 2011
@@ -147,7 +147,7 @@ public class MessageDatabase extends Ser
             }
             try {
                version = is.readInt();
-            }catch (EOFException expectedOnUpgrade) {
+            } catch (EOFException expectedOnUpgrade) {
                 version=1;
             }
             LOG.info("KahaDB is version " + version);
@@ -215,7 +215,6 @@ public class MessageDatabase extends Ser
     boolean enableIndexWriteAsync = false;
     int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
 
-
     protected AtomicBoolean opened = new AtomicBoolean();
     private LockFile lockFile;
     private boolean ignoreMissingJournalfiles = false;
@@ -279,7 +278,7 @@ public class MessageDatabase extends Ser
                 }
             });
             pageFile.flush();
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -379,7 +378,6 @@ public class MessageDatabase extends Ser
     }
 
     public void load() throws IOException {
-
         this.indexLock.writeLock().lock();
         try {
             lock();
@@ -395,13 +393,11 @@ public class MessageDatabase extends Ser
 
             open();
             store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
-
     }
 
-
     public void close() throws IOException, InterruptedException {
         if( opened.compareAndSet(true, false)) {
             this.indexLock.writeLock().lock();
@@ -413,7 +409,7 @@ public class MessageDatabase extends Ser
                 });
                 pageFile.unload();
                 metadata = new Metadata();
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
             journal.close();
@@ -438,7 +434,7 @@ public class MessageDatabase extends Ser
                     }
                 });
             }
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
         close();
@@ -499,8 +495,10 @@ public class MessageDatabase extends Ser
                     redoCounter++;
                     recoveryPosition = journal.getNextLocation(recoveryPosition);
                 }
-                long end = System.currentTimeMillis();
-                LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+                if (LOG.isInfoEnabled()) {
+                    long end = System.currentTimeMillis();
+                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
+                }
             }
 
             // We may have to undo some index updates.
@@ -520,11 +518,13 @@ public class MessageDatabase extends Ser
                     }
                 }
                 for (TransactionId tx: toRollback) {
-                    LOG.debug("rolling back recovered indoubt local transaction " + tx);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
+                    }
                     store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
                 }
             }
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -584,7 +584,6 @@ public class MessageDatabase extends Ser
                 }
             });
 
-
             for (Long sequenceId : matches) {
                 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
                 sd.locationIndex.remove(tx, keys.location);
@@ -595,11 +594,13 @@ public class MessageDatabase extends Ser
             }
         }
 
-        long end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
-            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
-            // should do sync writes to the journal.
-            LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+            // The rolledback operations are basically in flight journal writes.  To avoid getting
+            // these the end user should do sync writes to the journal.
+            if (LOG.isInfoEnabled()) {
+                long end = System.currentTimeMillis();
+                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+            }
         }
 
         undoCounter = 0;
@@ -637,34 +638,36 @@ public class MessageDatabase extends Ser
             });
         }
         HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
-        while( !ss.isEmpty() ) {
-            missingJournalFiles.add( (int)ss.removeFirst() );
+        while (!ss.isEmpty()) {
+            missingJournalFiles.add((int) ss.removeFirst());
         }
-        missingJournalFiles.removeAll( journal.getFileMap().keySet() );
+        missingJournalFiles.removeAll(journal.getFileMap().keySet());
 
-        if( !missingJournalFiles.isEmpty() ) {
-            LOG.info("Some journal files are missing: "+missingJournalFiles);
+        if (!missingJournalFiles.isEmpty()) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Some journal files are missing: " + missingJournalFiles);
+            }
         }
 
         ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
         for (Integer missing : missingJournalFiles) {
-            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing,0), new Location(missing+1,0)));
+            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
         }
 
-        if ( checkForCorruptJournalFiles ) {
+        if (checkForCorruptJournalFiles) {
             Collection<DataFile> dataFiles = journal.getFileMap().values();
             for (DataFile dataFile : dataFiles) {
                 int id = dataFile.getDataFileId();
-                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id,dataFile.getLength()), new Location(id+1,0)));
+                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
                 Sequence seq = dataFile.getCorruptedBlocks().getHead();
-                while( seq!=null ) {
-                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
+                while (seq != null) {
+                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
                     seq = seq.getNext();
                 }
             }
         }
 
-        if( !missingPredicates.isEmpty() ) {
+        if (!missingPredicates.isEmpty()) {
             for (StoredDestination sd : storedDestinations.values()) {
 
                 final ArrayList<Long> matches = new ArrayList<Long>();
@@ -676,7 +679,7 @@ public class MessageDatabase extends Ser
                 });
 
                 // If somes message references are affected by the missing data files...
-                if( !matches.isEmpty() ) {
+                if (!matches.isEmpty()) {
 
                     // We either 'gracefully' recover dropping the missing messages or
                     // we error out.
@@ -697,11 +700,13 @@ public class MessageDatabase extends Ser
             }
         }
 
-        end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
             // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
             // should do sync writes to the journal.
-            LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+            if (LOG.isInfoEnabled()) {
+                long end = System.currentTimeMillis();
+                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
+            }
         }
     }
 
@@ -725,7 +730,7 @@ public class MessageDatabase extends Ser
                 process(message, lastRecoveryPosition, (Runnable)null);
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -766,16 +771,18 @@ public class MessageDatabase extends Ser
                     checkpointUpdate(tx, cleanup);
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
+
         long end = System.currentTimeMillis();
-        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: cleanup took "+(end-start));
+        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
+            }
         }
     }
 
-
     public void checkpoint(Callback closure) throws Exception {
         this.indexLock.writeLock().lock();
         try {
@@ -785,7 +792,7 @@ public class MessageDatabase extends Ser
                 }
             });
             closure.execute();
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -819,7 +826,9 @@ public class MessageDatabase extends Ser
             process(data, location, after);
             long end = System.currentTimeMillis();
             if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-                LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
+                }
             }
 
             if (after != null) {
@@ -860,7 +869,9 @@ public class MessageDatabase extends Ser
         ByteSequence data = journal.read(location);
         long end = System.currentTimeMillis();
         if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
-            LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
+            }
         }
         DataByteArrayInputStream is = new DataByteArrayInputStream(data);
         byte readByte = is.readByte();
@@ -960,7 +971,7 @@ public class MessageDatabase extends Ser
                         upadateIndex(tx, command, location);
                     }
                 });
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
         }
@@ -978,11 +989,10 @@ public class MessageDatabase extends Ser
                         updateIndex(tx, command, location);
                     }
                 });
-            }finally {
+            } finally {
                 this.indexLock.writeLock().unlock();
             }
         }
-
     }
 
     protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
@@ -993,7 +1003,7 @@ public class MessageDatabase extends Ser
                     updateIndex(tx, command, location);
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -1006,7 +1016,7 @@ public class MessageDatabase extends Ser
                     updateIndex(tx, command, location);
                 }
             });
-        }finally {
+        } finally {
             this.indexLock.writeLock().unlock();
         }
     }
@@ -1255,7 +1265,9 @@ public class MessageDatabase extends Ser
             final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
             final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
 
-            LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Last update: " + firstTxLocation + ", full gc candidates set: " + gcCandidateSet);
+            }
 
             // Don't GC files under replication
             if( journalFilesBeingReplicated!=null ) {
@@ -1282,7 +1294,9 @@ public class MessageDatabase extends Ser
                         break;
                     }
                 }
-                LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("gc candidates after first tx:" + firstTxLocation + ", " + gcCandidateSet);
+                }
             }
 
             // Go through all the destinations to see if any of them can remove GC candidates.
@@ -1329,11 +1343,15 @@ public class MessageDatabase extends Ser
                         }
                     }
                 });
-                LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
+                }
             }
 
             // check we are not deleting file with ack for in-use journal files
-            LOG.trace("gc candidates: " + gcCandidateSet);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("gc candidates: " + gcCandidateSet);
+            }
             final TreeSet<Integer> gcCandidates = new TreeSet<Integer>(gcCandidateSet);
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
@@ -1350,14 +1368,18 @@ public class MessageDatabase extends Ser
                     if (gcCandidateSet.contains(candidate)) {
                         ackMessageFileMap.remove(candidate);
                     } else {
-                        LOG.trace("not removing data file: " + candidate
-                                + " as contained ack(s) refer to referenced file: " + referencedFileIds);
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("not removing data file: " + candidate
+                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
+                        }
                     }
                 }
             }
 
-            if( !gcCandidateSet.isEmpty() ) {
-                LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
+            if (!gcCandidateSet.isEmpty()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
+                }
                 journal.removeDataFiles(gcCandidateSet);
             }
         }
@@ -1382,7 +1404,6 @@ public class MessageDatabase extends Ser
     // StoredDestination related implementation methods.
     // /////////////////////////////////////////////////////////////////
 
-
     private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
 
     class StoredSubscription {
@@ -1615,7 +1636,6 @@ public class MessageDatabase extends Ser
         return rc;
     }
 
-
     protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
         String key = key(destination);
         StoredDestination rc = storedDestinations.get(key);
@@ -2357,7 +2377,6 @@ public class MessageDatabase extends Ser
             }
         }
 
-
         void remove(Transaction tx) throws IOException {
             defaultPriorityIndex.clear(tx);
             defaultPriorityIndex.unload(tx);
@@ -2508,8 +2527,6 @@ public class MessageDatabase extends Ser
             final Iterator<Entry<Long, MessageKeys>>defaultIterator;
             final Iterator<Entry<Long, MessageKeys>>lowIterator;
 
-
-
             MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
                 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
                 if (highPriorityIndex != null) {
@@ -2560,6 +2577,7 @@ public class MessageDatabase extends Ser
                         }
                         return false;
                     }
+
                     if (currentIterator == defaultIterator) {
                         if (lowIterator.hasNext()) {
                             currentIterator = lowIterator;