You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/05/09 18:11:50 UTC
svn commit: r1101085 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/store/kahadb/plist/
main/java/org/apache/activemq/thread/ test/java/org/apache/activemq/bugs/
test/java/org/apache/activemq...
Author: gtully
Date: Mon May 9 16:11:50 2011
New Revision: 1101085
URL: http://svn.apache.org/viewvc?rev=1101085&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3310 - IOException PListStore "Could not locate data file" from FilePendingMessageCursor. Issue with reference counting and async location initialsation resulting in inuse data file removal and data file leaking. Pulled cleanup into task that periodically queries lists for references
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon May 9 16:11:50 2011
@@ -1453,6 +1453,7 @@ public class BrokerService implements Se
}
this.tempDataStore = new PListStore();
this.tempDataStore.setDirectory(getTmpDataDirectory());
+ configureService(tempDataStore);
this.tempDataStore.start();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -1467,6 +1468,7 @@ public class BrokerService implements Se
*/
public void setTempDataStore(PListStore tempDataStore) {
this.tempDataStore = tempDataStore;
+ configureService(tempDataStore);
try {
tempDataStore.start();
} catch (Exception e) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Mon May 9 16:11:50 2011
@@ -175,8 +175,6 @@ public class PList {
EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
entry.setLocation(location);
storeEntry(tx, entry);
- this.store.incrementJournalCount(tx, location);
-
EntryLocation last = loadEntry(tx, this.lastId);
last.setNext(entry.getPage().getPageId());
storeEntry(tx, last);
@@ -210,7 +208,6 @@ public class PList {
storeEntry(tx, root);
storeEntry(tx, entry);
- this.store.incrementJournalCount(tx, location);
this.size++;
}
@@ -433,7 +430,6 @@ public class PList {
storeEntry(tx, prev);
}
- this.store.decrementJournalCount(tx, entry.getLocation());
entry.reset();
storeEntry(tx, entry);
tx.free(entry.getPage().getPageId());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Mon May 9 16:11:50 2011
@@ -22,17 +22,18 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
@@ -40,15 +41,16 @@ import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.IntegerMarshaller;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
*/
-public class PListStore extends ServiceSupport {
+public class PListStore extends ServiceSupport implements BrokerServiceAware, Runnable {
static final Logger LOG = LoggerFactory.getLogger(PListStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -69,11 +71,18 @@ public class PListStore extends ServiceS
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, PList> persistentLists = new HashMap<String, PList>();
final Object indexLock = new Object();
+ private Scheduler scheduler;
+ private long cleanupInterval = 30000;
public Object getIndexLock() {
return indexLock;
}
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.scheduler = brokerService.getScheduler();
+ }
+
protected class MetaData {
protected MetaData(PListStore store) {
this.store = store;
@@ -81,21 +90,16 @@ public class PListStore extends ServiceS
private final PListStore store;
Page<MetaData> page;
- BTreeIndex<Integer, Integer> journalRC;
BTreeIndex<String, PList> storedSchedulers;
void createIndexes(Transaction tx) throws IOException {
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
- this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
this.storedSchedulers.load(tx);
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.load(tx);
}
void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
@@ -110,15 +114,10 @@ public class PListStore extends ServiceS
this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
}
public void write(DataOutput os) throws IOException {
os.writeLong(this.storedSchedulers.getPageId());
- os.writeLong(this.journalRC.getPageId());
-
}
}
@@ -276,8 +275,15 @@ public class PListStore extends ServiceS
metaData.loadLists(tx, persistentLists);
}
});
-
this.pageFile.flush();
+
+ if (cleanupInterval > 0) {
+ if (scheduler == null) {
+ scheduler = new Scheduler(PListStore.class.getSimpleName());
+ scheduler.start();
+ }
+ scheduler.executePeriodically(this, cleanupInterval);
+ }
LOG.info(this + " initialized");
}
}
@@ -290,6 +296,12 @@ public class PListStore extends ServiceS
@Override
protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+ if (scheduler != null) {
+ if (PListStore.class.getSimpleName().equals(scheduler.getName())) {
+ scheduler.stop();
+ scheduler = null;
+ }
+ }
for (PList pl : this.persistentLists.values()) {
pl.unload();
}
@@ -308,27 +320,29 @@ public class PListStore extends ServiceS
}
- synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
- int logId = location.getDataFileId();
- Integer val = this.metaData.journalRC.get(tx, logId);
- int refCount = val != null ? val.intValue() + 1 : 1;
- this.metaData.journalRC.put(tx, logId, refCount);
-
+ public void run() {
+ try {
+ final Set<Integer> candidates = journal.getFileMap().keySet();
+ LOG.trace("Full gc candidate set:" + candidates);
+ for (PList list : persistentLists.values()) {
+ PListEntry entry = list.getFirst();
+ while (entry != null) {
+ claimCandidates(entry, candidates);
+ entry = list.getNext(entry);
+ }
+ LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
+ }
+ LOG.debug("GC Candidate set:" + candidates);
+ this.journal.removeDataFiles(candidates);
+ } catch (IOException e) {
+ LOG.error("Exception on periodic cleanup: " + e, e);
+ }
}
- synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
- int logId = location.getDataFileId();
- if (logId != Location.NOT_SET) {
- int refCount = this.metaData.journalRC.get(tx, logId);
- refCount--;
- if (refCount <= 0) {
- this.metaData.journalRC.remove(tx, logId);
- Set<Integer> set = new HashSet<Integer>();
- set.add(logId);
- this.journal.removeDataFiles(set);
- } else {
- this.metaData.journalRC.put(tx, logId, refCount);
- }
+ private void claimCandidates(PListEntry entry, Set<Integer> candidates) {
+ EntryLocation location = entry.getEntry();
+ if (location != null) {
+ candidates.remove(location.getLocation().getDataFileId());
}
}
@@ -404,6 +418,14 @@ public class PListStore extends ServiceS
this.enableIndexWriteAsync = enableIndexWriteAsync;
}
+ public long getCleanupInterval() {
+ return cleanupInterval;
+ }
+
+ public void setCleanupInterval(long cleanupInterval) {
+ this.cleanupInterval = cleanupInterval;
+ }
+
@Override
public String toString() {
return "PListStore:" + this.directory;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/thread/Scheduler.java Mon May 9 16:11:50 2011
@@ -80,4 +80,8 @@ public final class Scheduler extends Ser
}
}
+
+ public String getName() {
+ return name;
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java Mon May 9 16:11:50 2011
@@ -163,6 +163,10 @@ public class TempStorageBlockedBrokerTes
LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ broker.getSystemUsage().getTempUsage().getUsage());
+ // do a cleanup
+ broker.getTempDataStore().run();
+ LOG.info("Subscrition Usage: " + tempUsageBySubscription + ", endUsage: "
+ + broker.getSystemUsage().getTempUsage().getUsage());
assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), MESSAGES_COUNT);
assertEquals("Incorrect number of Messages Consumed: " + messagesConsumed.get(), messagesConsumed.get(),
@@ -187,6 +191,7 @@ public class TempStorageBlockedBrokerTes
IOHelper.deleteChildren(tmpDir);
PListStore tempStore = new PListStore();
tempStore.setDirectory(tmpDir);
+ tempStore.setJournalMaxFileLength(50*1024);
tempStore.start();
SystemUsage sysUsage = new SystemUsage("mySysUsage", persistence, tempStore);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1101085&r1=1101084&r2=1101085&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Mon May 9 16:11:50 2011
@@ -25,6 +25,11 @@ import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.util.IOHelper;
import org.apache.kahadb.util.ByteSequence;
@@ -147,7 +152,78 @@ public class PListTest {
assertTrue(plist.remove(0));
assertFalse(plist.remove(3));
}
-
+
+
+ @Test
+ public void testConcurrentAddRemove() throws Exception {
+ File directory = store.getDirectory();
+ store.stop();
+ IOHelper.mkdirs(directory);
+ IOHelper.deleteChildren(directory);
+ store = new PListStore();
+ store.setDirectory(directory);
+ store.setJournalMaxFileLength(1024*5);
+ store.start();
+
+ final ByteSequence payload = new ByteSequence(new byte[1024*4]);
+
+
+ final Vector<Throwable> exceptions = new Vector<Throwable>();
+ final int iterations = 1000;
+ final int numLists = 10;
+
+ final PList[] lists = new PList[numLists];
+ for (int i=0; i<numLists; i++) {
+ lists[i] = store.getPList("List" + i);
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(100);
+ class A implements Runnable {
+ @Override
+ public void run() {
+ try {
+ for (int i=0; i<iterations; i++) {
+ PList candidate = lists[i%numLists];
+ candidate.addLast(String.valueOf(i), payload);
+ PListEntry entry = candidate.getFirst();
+ assertTrue(candidate.remove(String.valueOf(i)));
+ }
+ } catch (Exception error) {
+ error.printStackTrace();
+ exceptions.add(error);
+ }
+ }
+ };
+
+ class B implements Runnable {
+ @Override
+ public void run() {
+ try {
+ for (int i=0; i<iterations; i++) {
+ PList candidate = lists[i%numLists];
+ candidate.addLast(String.valueOf(i), payload);
+ PListEntry entry = candidate.getFirst();
+ assertTrue(candidate.remove(String.valueOf(i)));
+ }
+ } catch (Exception error) {
+ error.printStackTrace();
+ exceptions.add(error);
+ }
+ }
+ };
+
+ executor.execute(new A());
+ executor.execute(new A());
+ executor.execute(new A());
+ executor.execute(new B());
+ executor.execute(new B());
+ executor.execute(new B());
+
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
@Before
public void setUp() throws Exception {