You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/15 20:08:16 UTC

svn commit: r1468171 - in /activemq/trunk: activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/ activemq-kahadb-store/src/main/proto/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ activemq-unit-tests/src/test/java/org/apa...

Author: tabish
Date: Mon Apr 15 18:08:15 2013
New Revision: 1468171

URL: http://svn.apache.org/r1468171
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4212
fix for: https://issues.apache.org/jira/browse/AMQ-2832

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java   (with props)
Modified:
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
    activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Mon Apr 15 18:08:15 2013
@@ -54,9 +54,9 @@ import org.apache.activemq.ActiveMQMessa
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
@@ -85,7 +85,12 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.*;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,13 +106,12 @@ public abstract class MessageDatabase ex
         UNMATCHED = new Buffer(new byte[]{});
     }
     private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
-    private static final int DEFAULT_DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
     static final int CLOSED_STATE = 1;
     static final int OPEN_STATE = 2;
     static final long NOT_ACKED = -1;
 
-    static final int VERSION = 4;
+    static final int VERSION = 5;
 
     protected class Metadata {
         protected Page<Metadata> page;
@@ -116,7 +120,9 @@ public abstract class MessageDatabase ex
         protected Location lastUpdate;
         protected Location firstInProgressTransactionLocation;
         protected Location producerSequenceIdTrackerLocation = null;
+        protected Location ackMessageFileMapLocation = null;
         protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
+        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
         protected int version = VERSION;
         public void read(DataInput is) throws IOException {
             state = is.readInt();
@@ -144,6 +150,11 @@ public abstract class MessageDatabase ex
             } catch (EOFException expectedOnUpgrade) {
                 version=1;
             }
+            if (version >= 5 && is.readBoolean()) {
+                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
+            } else {
+                ackMessageFileMapLocation = null;
+            }
             LOG.info("KahaDB is version " + version);
         }
 
@@ -172,6 +183,12 @@ public abstract class MessageDatabase ex
                 os.writeBoolean(false);
             }
             os.writeInt(VERSION);
+            if (ackMessageFileMapLocation != null) {
+                os.writeBoolean(true);
+                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
+            } else {
+                os.writeBoolean(false);
+            }
         }
     }
 
@@ -452,6 +469,7 @@ public abstract class MessageDatabase ex
         return range;
     }
 
+    @SuppressWarnings("rawtypes")
     private void trackMaxAndMin(Location[] range, List<Operation> ops) {
         Location t = ops.get(0).getLocation();
         if (range[0]==null || t.compareTo(range[0]) <= 0) {
@@ -473,6 +491,7 @@ public abstract class MessageDatabase ex
         }
         HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
 
+        @SuppressWarnings("rawtypes")
         public void track(Operation operation) {
             if (location == null ) {
                 location = operation.getLocation();
@@ -543,9 +562,11 @@ public abstract class MessageDatabase ex
 
             long start = System.currentTimeMillis();
             Location producerAuditPosition = recoverProducerAudit();
+            Location ackMessageFileLocation = recoverAckMessageFileMap();
             Location lastIndoubtPosition = getRecoveryPosition();
 
-            Location recoveryPosition = minimum(producerAuditPosition, lastIndoubtPosition);
+            Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation);
+            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
 
             if (recoveryPosition != null) {
                 int redoCounter = 0;
@@ -631,6 +652,24 @@ public abstract class MessageDatabase ex
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private Location recoverAckMessageFileMap() throws IOException {
+        if (metadata.ackMessageFileMapLocation != null) {
+            KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
+            try {
+                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
+                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
+                return journal.getNextLocation(metadata.ackMessageFileMapLocation);
+            } catch (Exception e) {
+                LOG.warn("Cannot recover ackMessageFileMap", e);
+                return journal.getNextLocation(null);
+            }
+        } else {
+            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
+            return journal.getNextLocation(null);
+        }
+    }
+
     protected void recoverIndex(Transaction tx) throws IOException {
         long start = System.currentTimeMillis();
         // It is possible index updates got applied before the journal updates..
@@ -760,7 +799,6 @@ public abstract class MessageDatabase ex
                             undoCounter++;
                             // TODO: do we need to modify the ack positions for the pub sub case?
                         }
-
                     } else {
                         throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected.");
                     }
@@ -947,6 +985,9 @@ public abstract class MessageDatabase ex
         byte readByte = is.readByte();
         KahaEntryType type = KahaEntryType.valueOf(readByte);
         if( type == null ) {
+            try {
+                is.close();
+            } catch (IOException e) {}
             throw new IOException("Could not load journal record. Invalid location: "+location);
         }
         JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
@@ -1024,6 +1065,11 @@ public abstract class MessageDatabase ex
             }
 
             @Override
+            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
+                processLocation(location);
+            }
+
+            @Override
             public void visit(KahaTraceCommand command) {
                 processLocation(location);
             }
@@ -1223,20 +1269,16 @@ public abstract class MessageDatabase ex
                 }
             } else {
                 // If the message ID as indexed, then the broker asked us to
-                // store a DUP
-                // message. Bad BOY! Don't do it, and log a warning.
+                // store a DUP message. Bad BOY! Don't do it, and log a warning.
                 LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
                 sd.messageIdIndex.put(tx, command.getMessageId(), previous);
                 sd.locationIndex.remove(tx, location);
                 rollbackStatsOnDuplicate(command.getDestination());
             }
         } else {
-            // restore the previous value.. Looks like this was a redo of a
-            // previously
-            // added message. We don't want to assign it a new id as the other
-            // indexes would
+            // restore the previous value.. Looks like this was a redo of a previously
+            // added message. We don't want to assign it a new id as the other indexes would
             // be wrong..
-            //
             sd.locationIndex.put(tx, location, previous);
         }
         // record this id in any event, initial send or recovery
@@ -1276,6 +1318,11 @@ public abstract class MessageDatabase ex
                     byte priority = sd.orderIndex.lastGetPriority();
                     sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
                 }
+
+                MessageKeys keys = sd.orderIndex.get(tx, sequence);
+                if (keys != null) {
+                    recordAckMessageReferenceLocation(ackLocation, keys.location);
+                }
                 // The following method handles deleting un-referenced messages.
                 removeAckLocation(tx, sd, subscriptionKey, sequence);
             } else if (LOG.isDebugEnabled()) {
@@ -1286,13 +1333,12 @@ public abstract class MessageDatabase ex
         metadata.lastUpdate = ackLocation;
     }
 
-    Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
     private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
-        Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
+        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
         if (referenceFileIds == null) {
             referenceFileIds = new HashSet<Integer>();
             referenceFileIds.add(messageLocation.getDataFileId());
-            ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
+            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
         } else {
             Integer id = Integer.valueOf(messageLocation.getDataFileId());
             if (!referenceFileIds.contains(id)) {
@@ -1325,6 +1371,10 @@ public abstract class MessageDatabase ex
             sd.ackPositions.clear(tx);
             sd.ackPositions.unload(tx);
             tx.free(sd.ackPositions.getHeadPageId());
+
+            sd.subLocations.clear(tx);
+            sd.subLocations.unload(tx);
+            tx.free(sd.subLocations.getHeadPageId());
         }
 
         String key = key(command.getDestination());
@@ -1339,6 +1389,7 @@ public abstract class MessageDatabase ex
         // If set then we are creating it.. otherwise we are destroying the sub
         if (command.hasSubscriptionInfo()) {
             sd.subscriptions.put(tx, subscriptionKey, command);
+            sd.subLocations.put(tx, subscriptionKey, location);
             long ackLocation=NOT_ACKED;
             if (!command.getRetroactive()) {
                 ackLocation = sd.orderIndex.nextMessageId-1;
@@ -1350,6 +1401,7 @@ public abstract class MessageDatabase ex
         } else {
             // delete the sub...
             sd.subscriptions.remove(tx, subscriptionKey);
+            sd.subLocations.remove(tx, subscriptionKey);
             sd.subscriptionAcks.remove(tx, subscriptionKey);
             sd.subscriptionCache.remove(subscriptionKey);
             removeAckLocationsForSub(tx, sd, subscriptionKey);
@@ -1394,6 +1446,7 @@ public abstract class MessageDatabase ex
 
         metadata.state = OPEN_STATE;
         metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
+        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
         Location[] inProgressTxRange = getInProgressTxLocationRange();
         metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
         tx.store(metadata.page, metadataMarshaller, true);
@@ -1432,6 +1485,14 @@ public abstract class MessageDatabase ex
                 }
             }
 
+            if (metadata.ackMessageFileMapLocation != null) {
+                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
+                gcCandidateSet.remove(dataFileId);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
+                }
+            }
+
             // Don't GC files referenced by in-progress tx
             if (inProgressTxRange[0] != null) {
                 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
@@ -1488,6 +1549,45 @@ public abstract class MessageDatabase ex
                         }
                     }
                 });
+
+                // Durable Subscription
+                if (entry.getValue().subLocations != null) {
+                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
+                    while (iter.hasNext()) {
+                        Entry<String, Location> subscription = iter.next();
+                        int dataFileId = subscription.getValue().getDataFileId();
+
+                        // Move subscription along if it has no outstanding messages that need ack'd
+                        // and its in the last log file in the journal.
+                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
+                            final StoredDestination destination = entry.getValue();
+                            final String subscriptionKey = subscription.getKey();
+                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
+
+                            // When pending is size one that is the next message Id meaning there
+                            // are no pending messages currently.
+                            if (pendingAcks == null || pendingAcks.size() <= 1) {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
+                                }
+
+                                final KahaSubscriptionCommand kahaSub =
+                                    destination.subscriptions.get(tx, subscriptionKey);
+                                destination.subLocations.put(
+                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
+
+                                // Skips the remove from candidates if we rewrote the subscription
+                                // in order to prevent duplicate subscription commands on recover.
+                                // If another subscription is on the same file and isn't rewritten
+                                // than it will remove the file from the set.
+                                continue;
+                            }
+                        }
+
+                        gcCandidateSet.remove(dataFileId);
+                    }
+                }
+
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
                 }
@@ -1501,7 +1601,7 @@ public abstract class MessageDatabase ex
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
                 Integer candidate = candidates.next();
-                Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
+                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
                 if (referencedFileIds != null) {
                     for (Integer referencedFileId : referencedFileIds) {
                         if (completeFileSet.contains(referencedFileId) && !gcCandidates.contains(referencedFileId)) {
@@ -1511,7 +1611,7 @@ public abstract class MessageDatabase ex
                         }
                     }
                     if (gcCandidateSet.contains(candidate)) {
-                        ackMessageFileMap.remove(candidate);
+                        metadata.ackMessageFileMap.remove(candidate);
                     } else {
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("not removing data file: " + candidate
@@ -1537,6 +1637,7 @@ public abstract class MessageDatabase ex
         public void run() {
         }
     };
+
     private Location checkpointProducerAudit() throws IOException {
         if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -1556,6 +1657,35 @@ public abstract class MessageDatabase ex
         return metadata.producerSequenceIdTrackerLocation;
     }
 
+    private Location checkpointAckMessageFileMap() throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oout = new ObjectOutputStream(baos);
+        oout.writeObject(metadata.ackMessageFileMap);
+        oout.flush();
+        oout.close();
+        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
+        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
+        try {
+            location.getLatch().await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.toString());
+        }
+        return location;
+    }
+
+    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
+
+        ByteSequence sequence = toByteSequence(subscription);
+        Location location = journal.write(sequence, nullCompletionCallback) ;
+
+        try {
+            location.getLatch().await();
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException(e.toString());
+        }
+        return location;
+    }
+
     public HashSet<Integer> getJournalFilesBeingReplicated() {
         return journalFilesBeingReplicated;
     }
@@ -1566,13 +1696,6 @@ public abstract class MessageDatabase ex
 
     private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
 
-    class StoredSubscription {
-        SubscriptionInfo subscriptionInfo;
-        String lastAckId;
-        Location lastAckLocation;
-        Location cursor;
-    }
-
     static class MessageKeys {
         final String messageId;
         final Location location;
@@ -1677,6 +1800,7 @@ public abstract class MessageDatabase ex
         BTreeIndex<String, LastAck> subscriptionAcks;
         HashMap<String, MessageOrderCursor> subscriptionCursors;
         ListIndex<String, SequenceSet> ackPositions;
+        ListIndex<String, Location> subLocations;
 
         // Transient data used to track which Messages are no longer needed.
         final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
@@ -1730,6 +1854,9 @@ public abstract class MessageDatabase ex
                             // Now move the pending messages to ack data into the store backed
                             // structure.
                             value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
+                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
+                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
+                            value.ackPositions.load(tx);
                             for(String subscriptionKey : temp.keySet()) {
                                 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
                             }
@@ -1737,26 +1864,41 @@ public abstract class MessageDatabase ex
                         }
                     });
                 }
-            }
-            if (metadata.version >= 2) {
-                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
-                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
-            } else {
+
+                if (metadata.version >= 5) {
+                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
+                } else {
                     // upgrade
                     pageFile.tx().execute(new Transaction.Closure<IOException>() {
                         @Override
                         public void execute(Transaction tx) throws IOException {
-                            value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                            value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                            value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                            value.orderIndex.lowPriorityIndex.load(tx);
-
-                            value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
-                            value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-                            value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
-                            value.orderIndex.highPriorityIndex.load(tx);
+                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
+                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
+                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
+                            value.subLocations.load(tx);
                         }
                     });
+                }
+            }
+            if (metadata.version >= 2) {
+                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
+            } else {
+                // upgrade
+                pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                    @Override
+                    public void execute(Transaction tx) throws IOException {
+                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                        value.orderIndex.lowPriorityIndex.load(tx);
+
+                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
+                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+                        value.orderIndex.highPriorityIndex.load(tx);
+                    }
+                });
             }
 
             return value;
@@ -1772,6 +1914,7 @@ public abstract class MessageDatabase ex
                 dataOut.writeLong(value.subscriptions.getPageId());
                 dataOut.writeLong(value.subscriptionAcks.getPageId());
                 dataOut.writeLong(value.ackPositions.getHeadPageId());
+                dataOut.writeLong(value.subLocations.getHeadPageId());
             } else {
                 dataOut.writeBoolean(false);
             }
@@ -1840,6 +1983,7 @@ public abstract class MessageDatabase ex
                 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
                 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
                 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
+                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
             }
             metadata.destinations.put(tx, key, rc);
         }
@@ -1873,6 +2017,10 @@ public abstract class MessageDatabase ex
             rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
             rc.ackPositions.load(tx);
 
+            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
+            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
+            rc.subLocations.load(tx);
+
             rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
 
             if (metadata.version < 3) {

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java Mon Apr 15 18:08:15 2013
@@ -18,6 +18,7 @@ package org.apache.activemq.store.kahadb
 
 import java.io.IOException;
 
+import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
 import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
@@ -53,8 +54,10 @@ public class Visitor {
 
     public void visit(KahaSubscriptionCommand kahaUpdateSubscriptionCommand) throws IOException {
     }
-    
+
     public void visit(KahaProducerAuditCommand kahaProducerAuditCommand) throws IOException {
     }
 
+    public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
+    }
 }

Modified: activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/proto/journal-data.proto Mon Apr 15 18:08:15 2013
@@ -30,6 +30,7 @@ enum KahaEntryType {
   KAHA_REMOVE_DESTINATION_COMMAND = 6;
   KAHA_SUBSCRIPTION_COMMAND = 7;
   KAHA_PRODUCER_AUDIT_COMMAND = 8;
+  KAHA_ACK_MESSAGE_FILE_MAP_COMMAND = 9;
 }
 
 message KahaTraceCommand {
@@ -40,7 +41,7 @@ message KahaTraceCommand {
   //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaTraceCommand>";
   //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
   //| option java_type_method = "KahaEntryType";
-  
+
   required string message = 1;
 }
 
@@ -48,7 +49,7 @@ message KahaAddMessageCommand {
   //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAddMessageCommand>";
   //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
   //| option java_type_method = "KahaEntryType";
-  
+
   optional KahaTransactionInfo transaction_info=1;
   required KahaDestination destination = 2;
   required string messageId = 3;
@@ -120,10 +121,22 @@ message KahaProducerAuditCommand {
   //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaProducerAuditCommand>";
   //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
   //| option java_type_method = "KahaEntryType";
-  
+
   required bytes audit = 1;
 }
 
+message KahaAckMessageFileMapCommand {
+  // We make use of the wonky comment style bellow because the following options
+  // are not valid for protoc, but they are valid for the ActiveMQ proto compiler.
+  // In the ActiveMQ proto compiler, comments terminate with the pipe character: |
+
+  //| option java_implments = "org.apache.activemq.store.kahadb.JournalCommand<KahaAckMessageFileMapCommand>";
+  //| option java_visitor = "org.apache.activemq.store.kahadb.Visitor:void:java.io.IOException";
+  //| option java_type_method = "KahaEntryType";
+
+  required bytes ackMessageFileMap = 1;
+}
+
 message KahaDestination {
   enum DestinationType {
     QUEUE = 0;
@@ -154,8 +167,8 @@ message KahaXATransactionId {
 }
 
 message KahaLocation {
-  required int32 log_id = 1;  
-  required int32 offset = 2;  
+  required int32 log_id = 1;
+  required int32 offset = 2;
 }
 
 // TODO things to ponder

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java Mon Apr 15 18:08:15 2013
@@ -16,14 +16,13 @@
  */
 package org.apache.activemq.bugs;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -31,51 +30,95 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.Topic;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AMQ2832Test {
 
     private static final Logger LOG = LoggerFactory.getLogger(AMQ2832Test.class);
 
     BrokerService broker = null;
+    private ActiveMQConnectionFactory cf;
     private final Destination destination = new ActiveMQQueue("AMQ2832Test");
+    private String connectionUri;
+
+    protected void startBroker() throws Exception {
+        doStartBroker(true, false);
+    }
+
+    protected void restartBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        doStartBroker(false, false);
+    }
 
-    protected void startBroker(boolean delete) throws Exception {
+    protected void recoverBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        doStartBroker(false, true);
+    }
+
+    private void doStartBroker(boolean delete, boolean recover) throws Exception {
         broker = new BrokerService();
         broker.setDeleteAllMessagesOnStartup(delete);
         broker.setPersistent(true);
-        broker.setUseJmx(false);
+        broker.setUseJmx(true);
         broker.addConnector("tcp://localhost:0");
 
-        configurePersistence(broker, delete);
+        configurePersistence(broker, recover);
+
+        connectionUri = "vm://localhost?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
 
         broker.start();
         LOG.info("Starting broker..");
     }
 
-    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+    protected void configurePersistence(BrokerService brokerService, boolean recover) throws Exception {
         KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
 
         // ensure there are a bunch of data files but multiple entries in each
         adapter.setJournalMaxFileLength(1024 * 20);
 
         // speed up the test case, checkpoint an cleanup early and often
-        adapter.setCheckpointInterval(500);
-        adapter.setCleanupInterval(500);
+        adapter.setCheckpointInterval(5000);
+        adapter.setCleanupInterval(5000);
 
-        if (!deleteAllOnStart) {
+        if (recover) {
             adapter.setForceRecoverIndex(true);
         }
+    }
 
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
     }
 
     @Test
     public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
 
-        startBroker(true);
+        startBroker();
 
         StagedConsumer consumer = new StagedConsumer();
         int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
@@ -102,9 +145,9 @@ public class AMQ2832Test {
         broker.stop();
         broker.waitUntilStopped();
 
-        startBroker(false);
+        recoverBroker();
 
-        consumer = new StagedConsumer();     
+        consumer = new StagedConsumer();
         // need to force recovery?
 
         Message msg = consumer.receive(1, 5);
@@ -115,10 +158,99 @@ public class AMQ2832Test {
         msg = consumer.receive(1, 5);
         assertEquals("Only one messages left after recovery: " + msg, null, msg);
         consumer.close();
+    }
 
+    @Test
+    public void testAlternateLossScenario() throws Exception {
+
+        startBroker();
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // This ensure that data file 1 never goes away.
+        createInactiveDurableSub(topic);
+        assertEquals(1, getNumberOfJournalFiles());
+
+        // One Queue Message that will be acked in another data file.
+        produceMessages(queue, 1);
+        assertEquals(1, getNumberOfJournalFiles());
+
+        // Add some messages to consume space
+        produceMessages(disposable, 50);
+
+        int dataFilesCount = getNumberOfJournalFiles();
+        assertTrue(dataFilesCount > 1);
+
+        // Create an ack for the single message on this queue
+        drainQueue(queue);
+
+        // Add some more messages to consume space beyond tha data file with the ack
+        produceMessages(disposable, 50);
+
+        assertTrue(dataFilesCount < getNumberOfJournalFiles());
+        dataFilesCount = getNumberOfJournalFiles();
+
+        restartBroker();
+
+        // Clear out all queue data
+        broker.getAdminView().removeQueue(disposable.getQueueName());
+
+        // Once this becomes true our ack could be lost.
+        assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 3;
+            }
+        }, TimeUnit.MINUTES.toMillis(3)));
+
+        // Recover and the Message should not be replayed but if the old MessageAck is lost
+        // then it could be.
+        recoverBroker();
+
+        assertTrue(drainQueue(queue) == 0);
     }
 
-    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+    private int getNumberOfJournalFiles() throws IOException {
+        Collection<DataFile> files =
+            ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+
+        return reality;
+    }
+
+    private void createInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        consumer.close();
+        connection.close();
+        produceMessages(topic, 1);
+    }
+
+    private int drainQueue(Queue queue) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        int count = 0;
+        while (consumer.receive(5000) != null) {
+            count++;
+        }
+        consumer.close();
+        connection.close();
+        return count;
+    }
+
+    private int produceMessages(Destination destination, int numToSend) throws Exception {
         int sent = 0;
         Connection connection = new ActiveMQConnectionFactory(
                 broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
@@ -133,10 +265,14 @@ public class AMQ2832Test {
         } finally {
             connection.close();
         }
-        
+
         return sent;
     }
 
+    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+        return produceMessages(destination, numToSend);
+    }
+
     final String payload = new String(new byte[1024]);
 
     private Message createMessage(Session session, int i) throws Exception {

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java?rev=1468171&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java Mon Apr 15 18:08:15 2013
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4212Test {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class);
+
+    private BrokerService service;
+    private String connectionUri;
+    private ActiveMQConnectionFactory cf;
+
+    private final int MSG_COUNT = 256;
+
+    @Before
+    public void setUp() throws IOException, Exception {
+        createBroker(true, false);
+    }
+
+    public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception {
+        service = new BrokerService();
+        service.setBrokerName("InactiveSubTest");
+        service.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        service.setAdvisorySupport(false);
+        service.setPersistent(true);
+        service.setUseJmx(true);
+        service.setKeepDurableSubsActive(false);
+
+        KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
+        File dataFile=new File("KahaDB");
+        pa.setDirectory(dataFile);
+        pa.setJournalMaxFileLength(10*1024);
+        pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5));
+        pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5));
+        pa.setForceRecoverIndex(recover);
+
+        service.setPersistenceAdapter(pa);
+        service.start();
+        service.waitUntilStarted();
+
+        connectionUri = "vm://InactiveSubTest?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    private void restartBroker() throws Exception {
+        stopBroker();
+        createBroker(false, false);
+    }
+
+    private void recoverBroker() throws Exception {
+        stopBroker();
+        createBroker(false, true);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (service != null) {
+            service.stop();
+            service.waitUntilStopped();
+            service = null;
+        }
+    }
+
+    @Test
+    public void testDirableSubPrefetchRecovered() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // Send to a Queue to create some journal files
+        sendMessages(queue);
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        createInactiveDurableSub(topic);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Now send some more to the queue to create even more files.
+        sendMessages(queue);
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        LOG.info("Restarting the broker.");
+        restartBroker();
+        LOG.info("Restarted the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Clear out all queue data
+        service.getAdminView().removeQueue(queue.getQueueName());
+
+        assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 2;
+            }
+        }, TimeUnit.MINUTES.toMillis(2)));
+
+        LOG.info("Sending {} Messages to the Topic.", MSG_COUNT);
+        // Send some messages to the inactive destination
+        sendMessages(topic);
+
+        LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT);
+        assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic));
+
+        LOG.info("Recovering the broker.");
+        recoverBroker();
+        LOG.info("Recovering the broker.");
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+    }
+
+    @Test
+    public void testDurableAcksNotDropped() throws Exception {
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // Create durable sub in first data file.
+        createInactiveDurableSub(topic);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Send to a Topic
+        sendMessages(topic, 1);
+
+        // Send to a Queue to create some journal files
+        sendMessages(queue);
+
+        LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        // Consume all the Messages leaving acks behind.
+        consumeDurableMessages(topic, 1);
+
+        LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        // Now send some more to the queue to create even more files.
+        sendMessages(queue);
+
+        LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        LOG.info("Restarting the broker.");
+        restartBroker();
+        LOG.info("Restarted the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+        assertTrue(getNumberOfJournalFiles() > 1);
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // Clear out all queue data
+        service.getAdminView().removeQueue(queue.getQueueName());
+
+        assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 3;
+            }
+        }, TimeUnit.MINUTES.toMillis(3)));
+
+        // See if we receive any message they should all be acked.
+        tryConsumeExpectNone(topic);
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        LOG.info("Recovering the broker.");
+        recoverBroker();
+        LOG.info("Recovering the broker.");
+
+        LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles());
+
+        assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers();
+                return subs != null && subs.length == 1 ? true : false;
+            }
+        }));
+
+        // See if we receive any message they should all be acked.
+        tryConsumeExpectNone(topic);
+
+        assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() == 1;
+            }
+        }, TimeUnit.MINUTES.toMillis(1)));
+    }
+
+    private int getNumberOfJournalFiles() throws IOException {
+        Collection<DataFile> files =
+            ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+
+        return reality;
+    }
+
+    private void createInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        consumer.close();
+        connection.close();
+    }
+
+    private void consumeDurableMessages(Topic topic, int count) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        connection.start();
+        for (int i = 0; i < count; ++i) {
+           if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) {
+               fail("should have received a message");
+           }
+        }
+        consumer.close();
+        connection.close();
+    }
+
+    private void tryConsumeExpectNone(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+        connection.start();
+        if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) {
+            fail("Should be no messages for this durable.");
+        }
+        consumer.close();
+        connection.close();
+    }
+
+    private int consumeFromInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive");
+
+        int count = 0;
+
+        while (consumer.receive(10000) != null) {
+            count++;
+        }
+
+        consumer.close();
+        connection.close();
+
+        return count;
+    }
+
+    private void sendMessages(Destination destination) throws Exception {
+        sendMessages(destination, MSG_COUNT);
+    }
+
+    private void sendMessages(Destination destination, int count) throws Exception {
+        Connection connection = cf.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < count; ++i) {
+            TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination);
+            producer.send(message);
+        }
+        connection.close();
+    }
+
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java Mon Apr 15 18:08:15 2013
@@ -16,17 +16,26 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import java.io.File;
+import java.io.IOException;
+import java.security.ProtectionDomain;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
 import junit.framework.TestCase;
-import org.apache.activemq.broker.BrokerService;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.util.IOHelper;
-
-import javax.jms.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.security.ProtectionDomain;
 
 /**
  * @author chirino
@@ -43,36 +52,36 @@ public class KahaDBVersionTest extends T
     }
 
     static final Logger LOG = LoggerFactory.getLogger(KahaDBVersionTest.class);
-    final static File VERSION_1_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
-    final static File VERSION_2_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
-    final static File VERSION_3_DB= new File(basedir+"/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
+    final static File VERSION_1_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
+    final static File VERSION_2_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
+    final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
+    final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
 
     BrokerService broker = null;
 
     protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
-
         broker = new BrokerService();
         broker.setUseJmx(false);
         broker.setPersistenceAdapter(kaha);
         broker.start();
         return broker;
-
     }
 
+    @Override
     protected void tearDown() throws Exception {
         if (broker != null) {
             broker.stop();
         }
     }
-        
+
     public void XtestCreateStore() throws Exception {
         KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
-        File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersionX");
+        File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
         IOHelper.deleteFile(dir);
         kaha.setDirectory(dir);
-        kaha.setJournalMaxFileLength(1024*1024);
+        kaha.setJournalMaxFileLength(1024 * 1024);
         BrokerService broker = createBroker(kaha);
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");        
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost");
         Connection connection = cf.createConnection();
         connection.setClientID("test");
         connection.start();
@@ -85,33 +94,37 @@ public class KahaDBVersionTest extends T
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Topic topic = session.createTopic("test.topic");
         Queue queue = session.createQueue("test.queue");
-        MessageConsumer consumer = session.createDurableSubscriber(topic,"test");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, "test");
         consumer.close();
         MessageProducer producer = session.createProducer(topic);
         producer.setPriority(9);
-        for (int i =0; i < numToSend; i++) {
-            Message msg = session.createTextMessage("test message:"+i);
+        for (int i = 0; i < numToSend; i++) {
+            Message msg = session.createTextMessage("test message:" + i);
             producer.send(msg);
         }
-        LOG.info("sent "  + numToSend +" to topic");
+        LOG.info("sent " + numToSend + " to topic");
         producer = session.createProducer(queue);
-        for (int i =0; i < numToSend; i++) {
-            Message msg = session.createTextMessage("test message:"+i);
+        for (int i = 0; i < numToSend; i++) {
+            Message msg = session.createTextMessage("test message:" + i);
             producer.send(msg);
         }
-        LOG.info("sent " + numToSend +" to queue");
+        LOG.info("sent " + numToSend + " to queue");
+    }
+
+    public void testVersion1Conversion() throws Exception {
+        doConvertRestartCycle(VERSION_1_DB);
     }
 
-    public void testVersion1Conversion() throws Exception{
-          doConvertRestartCycle(VERSION_1_DB);
+    public void testVersion2Conversion() throws Exception {
+        doConvertRestartCycle(VERSION_2_DB);
     }
 
-    public void testVersion2Conversion() throws Exception{
-          doConvertRestartCycle(VERSION_2_DB);
+    public void testVersion3Conversion() throws Exception {
+        doConvertRestartCycle(VERSION_3_DB);
     }
 
-    public void testVersion3Conversion() throws Exception{
-          doConvertRestartCycle(VERSION_3_DB);
+    public void testVersion4Conversion() throws Exception {
+        doConvertRestartCycle(VERSION_4_DB);
     }
 
     public void doConvertRestartCycle(File existingStore) throws Exception {
@@ -145,7 +158,7 @@ public class KahaDBVersionTest extends T
             for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
                 TextMessage msg = (TextMessage) queueConsumer.receive(10000);
                 count++;
-                //System.err.println(msg.getText());
+                // System.err.println(msg.getText());
                 assertNotNull(msg);
             }
             LOG.info("Consumed " + count + " from queue");
@@ -154,12 +167,12 @@ public class KahaDBVersionTest extends T
             for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
                 TextMessage msg = (TextMessage) topicConsumer.receive(10000);
                 count++;
-                //System.err.println(msg.getText());
+                // System.err.println(msg.getText());
                 assertNotNull(msg);
             }
             LOG.info("Consumed " + count + " from topic");
             connection.close();
-            
+
             broker.stop();
         }
     }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java?rev=1468171&r1=1468170&r2=1468171&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java Mon Apr 15 18:08:15 2013
@@ -20,13 +20,16 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+
 import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
@@ -44,8 +47,9 @@ public class DurableSubsOfflineSelectorC
     public int messageCount = 10000;
     private BrokerService broker;
     private ActiveMQTopic topic;
-    private List<Throwable> exceptions = new ArrayList<Throwable>();
+    private final List<Throwable> exceptions = new ArrayList<Throwable>();
 
+    @Override
     protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
         connectionFactory.setWatchTopicAdvisories(false);
@@ -68,6 +72,7 @@ public class DurableSubsOfflineSelectorC
         return suite(DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.class);
     }
 
+    @Override
     protected void setUp() throws Exception {
         exceptions.clear();
         topic = (ActiveMQTopic) createDestination();
@@ -75,6 +80,7 @@ public class DurableSubsOfflineSelectorC
         super.setUp();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         super.tearDown();
         destroyBroker();
@@ -128,6 +134,7 @@ public class DurableSubsOfflineSelectorC
 
         final CountDownLatch goOn = new CountDownLatch(1);
         Thread sendThread = new Thread() {
+            @Override
             public void run() {
                 try {
 
@@ -208,10 +215,10 @@ public class DurableSubsOfflineSelectorC
             LOG.info("Store free page count: " + store.getPageFile().getFreePageCount());
             LOG.info("Store page in-use: " + (store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount()));
 
-            assertTrue("no leak of pages, always use just 10", Wait.waitFor(new Wait.Condition() {
+            assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() {
                 @Override
                 public boolean isSatisified() throws Exception {
-                    return 10 == store.getPageFile().getPageCount() -
+                    return 11 == store.getPageFile().getPageCount() -
                             store.getPageFile().getFreePageCount();
                 }
             }, TimeUnit.SECONDS.toMillis(10)));
@@ -236,6 +243,7 @@ public class DurableSubsOfflineSelectorC
         Listener() {
         }
 
+        @Override
         public void onMessage(Message message) {
             count++;
             if (id != null) {