You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/05/09 18:11:50 UTC

svn commit: r1101085 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/store/kahadb/plist/ main/java/org/apache/activemq/thread/ test/java/org/apache/activemq/bugs/ test/java/org/apache/activemq...

Author: gtully
Date: Mon May  9 16:11:50 2011
New Revision: 1101085

URL: http://svn.apache.org/viewvc?rev=1101085&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3310 - IOException PListStore "Could not locate data file" from FilePendingMessageCursor. Issue with reference counting and async location initialsation resulting in inuse data file removal and data file leaking. Pulled cleanup into task that periodically queries lists for references

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon May  9 16:11:50 2011
@@ -1453,6 +1453,7 @@ public class BrokerService implements Se
                 }
                 this.tempDataStore = new PListStore();
                 this.tempDataStore.setDirectory(getTmpDataDirectory());
+                configureService(tempDataStore);
                 this.tempDataStore.start();
             } catch (Exception e) {
                 throw new RuntimeException(e);
@@ -1467,6 +1468,7 @@ public class BrokerService implements Se
      */
     public void setTempDataStore(PListStore tempDataStore) {
         this.tempDataStore = tempDataStore;
+        configureService(tempDataStore);
         try {
             tempDataStore.start();
         } catch (Exception e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Mon May  9 16:11:50 2011
@@ -175,8 +175,6 @@ public class PList {
         EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
         entry.setLocation(location);
         storeEntry(tx, entry);
-        this.store.incrementJournalCount(tx, location);
-
         EntryLocation last = loadEntry(tx, this.lastId);
         last.setNext(entry.getPage().getPageId());
         storeEntry(tx, last);
@@ -210,7 +208,6 @@ public class PList {
         storeEntry(tx, root);
         storeEntry(tx, entry);
 
-        this.store.incrementJournalCount(tx, location);
         this.size++;
     }
 
@@ -433,7 +430,6 @@ public class PList {
                 storeEntry(tx, prev);
             }
 
-            this.store.decrementJournalCount(tx, entry.getLocation());
             entry.reset();
             storeEntry(tx, entry);
             tx.free(entry.getPage().getPageId());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Mon May  9 16:11:50 2011
@@ -22,17 +22,18 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
@@ -40,15 +41,16 @@ import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
 import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.IntegerMarshaller;
 import org.apache.kahadb.util.LockFile;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
  */
-public class PListStore extends ServiceSupport {
+public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
     static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
@@ -69,11 +71,18 @@ public class PListStore extends ServiceS
     final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
     Map<String, PList> persistentLists = new HashMap<String, PList>();
     final Object indexLock = new Object();
+    private Scheduler scheduler;
+    private long cleanupInterval = 30000;
 
     public Object getIndexLock() {
         return indexLock;
     }
 
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.scheduler = brokerService.getScheduler();
+    }
+
     protected class MetaData {
         protected MetaData(PListStore store) {
             this.store = store;
@@ -81,21 +90,16 @@ public class PListStore extends ServiceS
 
         private final PListStore store;
         Page<MetaData> page;
-        BTreeIndex<Integer, Integer> journalRC;
         BTreeIndex<String, PList> storedSchedulers;
 
         void createIndexes(Transaction tx) throws IOException {
             this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
-            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
         }
 
         void load(Transaction tx) throws IOException {
             this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
             this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
             this.storedSchedulers.load(tx);
-            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.load(tx);
         }
 
         void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
@@ -110,15 +114,10 @@ public class PListStore extends ServiceS
             this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
             this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
             this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
-            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
         }
 
         public void write(DataOutput os) throws IOException {
             os.writeLong(this.storedSchedulers.getPageId());
-            os.writeLong(this.journalRC.getPageId());
-
         }
     }
 
@@ -276,8 +275,15 @@ public class PListStore extends ServiceS
                         metaData.loadLists(tx, persistentLists);
                     }
                 });
-
                 this.pageFile.flush();
+
+                if (cleanupInterval > 0) {
+                    if (scheduler == null) {
+                        scheduler = new Scheduler(PListStore.class.getSimpleName());
+                        scheduler.start();
+                    }
+                    scheduler.executePeriodically(this, cleanupInterval);
+                }
                 LOG.info(this + " initialized");
             }
         }
@@ -290,6 +296,12 @@ public class PListStore extends ServiceS
 
     @Override
     protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+        if (scheduler != null) {
+            if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
+                scheduler.stop();
+                scheduler = null;
+            }
+        }
         for (PList pl : this.persistentLists.values()) {
             pl.unload();
         }
@@ -308,27 +320,29 @@ public class PListStore extends ServiceS
 
     }
 
-    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
-        int logId = location.getDataFileId();
-        Integer val = this.metaData.journalRC.get(tx, logId);
-        int refCount = val != null ? val.intValue() + 1 : 1;
-        this.metaData.journalRC.put(tx, logId, refCount);
-
+    public void run() {
+        try {
+            final Set<Integer> candidates = journal.getFileMap().keySet();
+            LOG.trace("Full gc candidate set:" + candidates);
+            for (PList list : persistentLists.values()) {
+                PListEntry entry = list.getFirst();
+                while (entry != null) {
+                    claimCandidates(entry, candidates);
+                    entry = list.getNext(entry);
+                }
+                LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
+            }
+            LOG.debug("GC Candidate set:" + candidates);
+            this.journal.removeDataFiles(candidates);
+        } catch (IOException e) {
+            LOG.error("Exception on periodic cleanup: " + e, e);
+        }
     }
 
-    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
-        int logId = location.getDataFileId();
-        if (logId != Location.NOT_SET) {
-            int refCount = this.metaData.journalRC.get(tx, logId);
-            refCount--;
-            if (refCount <= 0) {
-                this.metaData.journalRC.remove(tx, logId);
-                Set<Integer> set = new HashSet<Integer>();
-                set.add(logId);
-                this.journal.removeDataFiles(set);
-            } else {
-                this.metaData.journalRC.put(tx, logId, refCount);
-            }
+    private void claimCandidates(PListEntry entry, Set<Integer> candidates) {
+        EntryLocation location = entry.getEntry();
+        if (location != null) {
+            candidates.remove(location.getLocation().getDataFileId());
         }
     }
 
@@ -404,6 +418,14 @@ public class PListStore extends ServiceS
         this.enableIndexWriteAsync = enableIndexWriteAsync;
     }
 
+    public long getCleanupInterval() {
+        return cleanupInterval;
+    }
+
+    public void setCleanupInterval(long cleanupInterval) {
+        this.cleanupInterval = cleanupInterval;
+    }
+
     @Override
     public String toString() {
         return "PListStore:" + this.directory;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Mon May  9 16:11:50 2011
@@ -80,4 +80,8 @@ public final class Scheduler extends Ser
        }
         
     }
+
+    public String getName() {
+        return name;
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java Mon May  9 16:11:50 2011
@@ -163,6 +163,10 @@ public class TempStorageBlockedBrokerTes
         LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
                 + broker.getSystemUsage().getTempUsage().getUsage());
 
+        // do a cleanup
+        broker.getTempDataStore().run();
+        LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+                        + broker.getSystemUsage().getTempUsage().getUsage());
 
         assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT);
         assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(),
@@ -187,6 +191,7 @@ public class TempStorageBlockedBrokerTes
         IOHelper.deleteChildren(tmpDir);
         PListStore tempStore = new PListStore();
         tempStore.setDirectory(tmpDir);
+        tempStore.setJournalMaxFileLength(50*1024);
         tempStore.start();
 
         SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Mon May  9 16:11:50 2011
@@ -25,6 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.util.IOHelper;
 import org.apache.kahadb.util.ByteSequence;
@@ -147,7 +152,78 @@ public class PListTest {
         assertTrue(plist.remove(0));
         assertFalse(plist.remove(3));
     }
-    
+
+
+    @Test
+    public void testConcurrentAddRemove() throws Exception {
+        File directory = store.getDirectory();
+        store.stop();
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        store = new PListStore();
+        store.setDirectory(directory);
+        store.setJournalMaxFileLength(1024*5);
+        store.start();
+
+        final ByteSequence payload = new ByteSequence(new byte[1024*4]);
+
+
+        final Vector<Throwable> exceptions = new Vector<Throwable>();
+        final int iterations = 1000;
+        final int numLists = 10;
+
+        final PList[] lists = new PList[numLists];
+        for (int i=0; i<numLists; i++) {
+            lists[i] = store.getPList("List" + i);
+        }
+
+        ExecutorService executor = Executors.newFixedThreadPool(100);
+        class A implements Runnable {
+            @Override
+            public void run() {
+                try {
+                    for (int i=0; i<iterations; i++) {
+                        PList candidate = lists[i%numLists];
+                        candidate.addLast(String.valueOf(i), payload);
+                        PListEntry entry = candidate.getFirst();
+                        assertTrue(candidate.remove(String.valueOf(i)));
+                    }
+                } catch (Exception error) {
+                    error.printStackTrace();
+                    exceptions.add(error);
+                }
+            }
+        };
+
+        class B implements  Runnable {
+            @Override
+            public void run() {
+                try {
+                    for (int i=0; i<iterations; i++) {
+                        PList candidate = lists[i%numLists];
+                        candidate.addLast(String.valueOf(i), payload);
+                        PListEntry entry = candidate.getFirst();
+                        assertTrue(candidate.remove(String.valueOf(i)));
+                    }
+                } catch (Exception error) {
+                    error.printStackTrace();
+                    exceptions.add(error);
+                }
+            }
+        };
+
+        executor.execute(new A());
+        executor.execute(new A());
+        executor.execute(new A());
+        executor.execute(new B());
+        executor.execute(new B());
+        executor.execute(new B());
+
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
 
     @Before
     public void setUp() throws Exception {