You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/06/02 17:28:31 UTC
svn commit: r1130607 [1/2] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/broker/region/
activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/
activemq-core...
Author: gtully
Date: Thu Jun 2 15:28:30 2011
New Revision: 1130607
URL: http://svn.apache.org/viewvc?rev=1130607&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3351 - Usage of the temp store index by the PList needs the be improved. new implementation puts the max entries in a page, reading/writing requires substantially less page access and disk access when pending messages build up
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/
activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/
activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java (with props)
Removed:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/EntryLocation.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListNode.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/ListIndexTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jun 2 15:28:30 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.broker;
import java.io.IOException;
+import java.net.SocketException;
import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
@@ -228,13 +229,22 @@ public class TransportConnection impleme
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e);
- } else if (TRANSPORTLOG.isInfoEnabled()) {
+ } else if (TRANSPORTLOG.isInfoEnabled() && !expected(e)) {
TRANSPORTLOG.info("Transport failed: " + e);
}
stopAsync();
}
}
+ private boolean expected(IOException e) {
+ return e instanceof SocketException && isStomp() && e.getMessage().indexOf("reset") != -1;
+ }
+
+ private boolean isStomp() {
+ URI uri = connector.getUri();
+ return uri != null && uri.getScheme() != null && uri.getScheme().indexOf("stomp") != -1;
+ }
+
/**
* Calls the serviceException method in an async thread. Since handling a
* service exception closes a socket, we should not tie up broker threads
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Thu Jun 2 15:28:30 2011
@@ -583,12 +583,12 @@ public abstract class BaseDestination im
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (systemUsage.isSendFailIfNoSpace()) {
- getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
+ getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
}
if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark)) {
- getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send: " + warning);
+ getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
}
} else {
@@ -601,7 +601,7 @@ public abstract class BaseDestination im
long now = System.currentTimeMillis();
if (now >= nextWarn) {
- getLog().info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
+ getLog().info("" + usage + ": " + warning + " (blocking for: " + (now - start) / 1000 + "s)");
nextWarn = now + blockedProducerWarningInterval;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Jun 2 15:28:30 2011
@@ -631,7 +631,7 @@ public class Queue extends BaseDestinati
} else {
if (memoryUsage.isFull()) {
- waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
+ waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
+ message.getProducerId() + ") stopped to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
@@ -738,7 +738,7 @@ public class Queue extends BaseDestinati
private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
if (message.isPersistent()) {
if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
- final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+ message.getProducerId() + ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
@@ -747,7 +747,7 @@ public class Queue extends BaseDestinati
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
} else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
- final String logMessage = "Usage Manager Temp Store is Full ("
+ final String logMessage = "Temp Store is Full ("
+ systemUsage.getTempUsage().getPercentUsage() + "% of " + systemUsage.getTempUsage().getLimit()
+"). Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Jun 2 15:28:30 2011
@@ -293,10 +293,7 @@ public class Topic extends BaseDestinati
if (warnOnProducerFlowControl) {
warnOnProducerFlowControl = false;
- LOG
- .info("Usage Manager memory limit ("
- + memoryUsage.getLimit()
- + ") reached for "
+ LOG.info(memoryUsage + ", Usage Manager memory limit reached for "
+ getActiveMQDestination().getQualifiedName()
+ ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
@@ -304,7 +301,7 @@ public class Topic extends BaseDestinati
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("
- + memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId()
+ + memoryUsage.getLimit() + ") reached. Rejecting send for producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
}
@@ -379,7 +376,7 @@ public class Topic extends BaseDestinati
waitForSpace(
context,
memoryUsage,
- "Usage Manager memory limit reached. Stopping producer ("
+ "Usage Manager Memory Usage limit reached. Stopping producer ("
+ message.getProducerId()
+ ") to prevent flooding "
+ getActiveMQDestination().getQualifiedName()
@@ -427,7 +424,7 @@ public class Topic extends BaseDestinati
if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
- final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
+ systemUsage.getStoreUsage().getLimit() + ". Stopping producer (" + message.getProducerId()
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Thu Jun 2 15:28:30 2011
@@ -135,7 +135,9 @@ public class FilePendingMessageCursor ex
iterating = false;
if (flushRequired) {
flushRequired = false;
- flushToDisk();
+ if (!hasSpace()) {
+ flushToDisk();
+ }
}
}
@@ -151,8 +153,9 @@ public class FilePendingMessageCursor ex
}
private void destroyDiskList() throws Exception {
- if (!isDiskListEmpty()) {
+ if (diskList != null) {
store.removePList(name);
+ diskList = null;
}
}
@@ -335,7 +338,7 @@ public class FilePendingMessageCursor ex
*/
@Override
public synchronized int size() {
- return memoryList.size() + (isDiskListEmpty() ? 0 : getDiskList().size());
+ return memoryList.size() + (isDiskListEmpty() ? 0 : (int)getDiskList().size());
}
/**
@@ -374,12 +377,14 @@ public class FilePendingMessageCursor ex
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
- flushRequired = true;
- if (!iterating) {
- expireOldMessages();
- if (!hasSpace()) {
- flushToDisk();
- flushRequired = false;
+ if (!flushRequired) {
+ flushRequired =true;
+ if (!iterating) {
+ expireOldMessages();
+ if (!hasSpace()) {
+ flushToDisk();
+ flushRequired = false;
+ }
}
}
}
@@ -412,8 +417,12 @@ public class FilePendingMessageCursor ex
}
protected synchronized void flushToDisk() {
-
if (!memoryList.isEmpty()) {
+ long start = 0;
+ if (LOG.isTraceEnabled()) {
+ start = System.currentTimeMillis();
+ LOG.trace("" + name + ", flushToDisk() mem list size: " +memoryList.size() + " " + (systemUsage != null ? systemUsage.getMemoryUsage() : "") );
+ }
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
node.decrementReferenceCount();
@@ -429,6 +438,9 @@ public class FilePendingMessageCursor ex
}
memoryList.clear();
setCacheEnabled(false);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("" + name + ", flushToDisk() done - " + (System.currentTimeMillis() - start) + "ms " + (systemUsage != null ? systemUsage.getMemoryUsage() : ""));
+ }
}
}
@@ -471,35 +483,23 @@ public class FilePendingMessageCursor ex
}
final class DiskIterator implements Iterator<MessageReference> {
- private PListEntry next = null;
- private PListEntry current = null;
- PList list;
-
+ private final Iterator<PListEntry> iterator;
DiskIterator() {
try {
- this.list = getDiskList();
- synchronized (this.list) {
- this.current = this.list.getFirst();
- this.next = this.current;
- }
+ iterator = getDiskList().iterator();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public boolean hasNext() {
- return this.next != null;
+ return iterator.hasNext();
}
public MessageReference next() {
- this.current = next;
try {
- ByteSequence bs = this.current.getByteSequence();
- synchronized (this.list) {
- this.current = this.list.refresh(this.current);
- this.next = this.list.getNext(this.current);
- }
- return getMessage(bs);
+ PListEntry entry = iterator.next();
+ return getMessage(entry.getByteSequence());
} catch (IOException e) {
LOG.error("I/O error", e);
throw new RuntimeException(e);
@@ -507,17 +507,7 @@ public class FilePendingMessageCursor ex
}
public void remove() {
- try {
- synchronized (this.list) {
- this.current = this.list.refresh(this.current);
- this.list.remove(this.current);
- }
-
- } catch (IOException e) {
- LOG.error("I/O error", e);
- throw new RuntimeException(e);
- }
-
+ iterator.remove();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Thu Jun 2 15:28:30 2011
@@ -36,7 +36,6 @@ import java.util.concurrent.locks.Reentr
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
@@ -63,6 +62,7 @@ import org.apache.activemq.util.Callback
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.kahadb.util.LocationMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kahadb.index.BTreeIndex;
@@ -816,7 +816,12 @@ public class MessageDatabase extends Ser
* @throws IOException
*/
public JournalCommand<?> load(Location location) throws IOException {
+ long start = System.currentTimeMillis();
ByteSequence data = journal.read(location);
+ long end = System.currentTimeMillis();
+ if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
+ }
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
KahaEntryType type = KahaEntryType.valueOf(readByte);
@@ -1472,34 +1477,6 @@ public class MessageDatabase extends Ser
}
}
- static class LocationMarshaller implements Marshaller<Location> {
- final static LocationMarshaller INSTANCE = new LocationMarshaller();
-
- public Location readPayload(DataInput dataIn) throws IOException {
- Location rc = new Location();
- rc.setDataFileId(dataIn.readInt());
- rc.setOffset(dataIn.readInt());
- return rc;
- }
-
- public void writePayload(Location object, DataOutput dataOut) throws IOException {
- dataOut.writeInt(object.getDataFileId());
- dataOut.writeInt(object.getOffset());
- }
-
- public int getFixedSize() {
- return 8;
- }
-
- public Location deepCopy(Location source) {
- return new Location(source);
- }
-
- public boolean isDeepCopySupported() {
- return true;
- }
- }
-
static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
@@ -1569,7 +1546,7 @@ public class MessageDatabase extends Ser
// Figure out the next key using the last entry in the destination.
rc.orderIndex.configureLast(tx);
- rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+ rc.locationIndex.setKeyMarshaller(org.apache.kahadb.util.LocationMarshaller.INSTANCE);
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
rc.locationIndex.load(tx);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Thu Jun 2 15:28:30 2011
@@ -19,505 +19,241 @@ package org.apache.activemq.store.kahadb
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.activemq.store.kahadb.plist.EntryLocation.EntryLocationMarshaller;
+import org.apache.kahadb.index.ListIndex;
+import org.apache.kahadb.index.ListNode;
import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.LocationMarshaller;
+import org.apache.kahadb.util.StringMarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PList {
+public class PList extends ListIndex<String, Location> {
static final Logger LOG = LoggerFactory.getLogger(PList.class);
final PListStore store;
private String name;
- private long rootId = EntryLocation.NOT_SET;
- private long lastId = EntryLocation.NOT_SET;
- private final AtomicBoolean loaded = new AtomicBoolean();
- private int size = 0;
Object indexLock;
PList(PListStore store) {
this.store = store;
this.indexLock = store.getIndexLock();
+ setPageFile(store.getPageFile());
+ setKeyMarshaller(StringMarshaller.INSTANCE);
+ setValueMarshaller(LocationMarshaller.INSTANCE);
}
public void setName(String name) {
this.name = name;
}
- /*
- * (non-Javadoc)
- * @see org.apache.activemq.beanstalk.JobScheduler#getName()
- */
public String getName() {
return this.name;
}
- public synchronized int size() {
- return this.size;
- }
-
- public synchronized boolean isEmpty() {
- return size == 0;
- }
-
- /**
- * @return the rootId
- */
- public long getRootId() {
- return this.rootId;
- }
-
- /**
- * @param rootId
- * the rootId to set
- */
- public void setRootId(long rootId) {
- this.rootId = rootId;
- }
-
- /**
- * @return the lastId
- */
- public long getLastId() {
- return this.lastId;
- }
-
- /**
- * @param lastId
- * the lastId to set
- */
- public void setLastId(long lastId) {
- this.lastId = lastId;
- }
-
- /**
- * @return the loaded
- */
- public boolean isLoaded() {
- return this.loaded.get();
- }
-
void read(DataInput in) throws IOException {
- this.rootId = in.readLong();
- this.name = in.readUTF();
+ this.headPageId = in.readLong();
}
public void write(DataOutput out) throws IOException {
- out.writeLong(this.rootId);
- out.writeUTF(name);
+ out.writeLong(this.headPageId);
}
public synchronized void destroy() throws IOException {
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- destroy(tx);
+ clear(tx);
+ unload(tx);
}
});
}
}
- void destroy(Transaction tx) throws IOException {
- // start from the first
- EntryLocation entry = getFirst(tx);
- while (entry != null) {
- EntryLocation toRemove = entry.copy();
- entry = getNext(tx, entry.getNext());
- doRemove(tx, toRemove);
- }
- }
-
- synchronized void load(Transaction tx) throws IOException {
- if (loaded.compareAndSet(false, true)) {
- final Page<EntryLocation> p = tx.load(this.rootId, null);
- if (p.getType() == Page.PAGE_FREE_TYPE) {
- // Need to initialize it..
- EntryLocation root = createEntry(p, "root", EntryLocation.NOT_SET, EntryLocation.NOT_SET);
-
- storeEntry(tx, root);
- this.lastId = root.getPage().getPageId();
- } else {
- // find last id
- long nextId = this.rootId;
- while (nextId != EntryLocation.NOT_SET) {
- EntryLocation next = getNext(tx, nextId);
- if (next != null) {
- this.lastId = next.getPage().getPageId();
- nextId = next.getNext();
- this.size++;
- }
- }
- }
- }
- }
-
- synchronized public void unload() {
- if (loaded.compareAndSet(true, false)) {
- this.rootId = EntryLocation.NOT_SET;
- this.lastId = EntryLocation.NOT_SET;
- this.size=0;
- }
- }
-
- synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
+ public void addLast(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- addLast(tx, id, bs, location);
+ add(tx, id, location);
}
});
}
}
- private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
- EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
- entry.setLocation(location);
- storeEntry(tx, entry);
- EntryLocation last = loadEntry(tx, this.lastId);
- last.setNext(entry.getPage().getPageId());
- storeEntry(tx, last);
- this.lastId = entry.getPage().getPageId();
- this.size++;
- }
-
- synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
+ public void addFirst(final String id, final ByteSequence bs) throws IOException {
final Location location = this.store.write(bs, false);
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- addFirst(tx, id, bs, location);
- }
- });
- }
- }
-
- private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
- EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
- entry.setLocation(location);
- EntryLocation oldFirst = getFirst(tx);
- if (oldFirst != null) {
- oldFirst.setPrev(entry.getPage().getPageId());
- storeEntry(tx, oldFirst);
- entry.setNext(oldFirst.getPage().getPageId());
-
- }
- EntryLocation root = getRoot(tx);
- root.setNext(entry.getPage().getPageId());
- storeEntry(tx, root);
- storeEntry(tx, entry);
-
- this.size++;
- }
-
- synchronized public boolean remove(final String id) throws IOException {
- final AtomicBoolean result = new AtomicBoolean();
- synchronized (indexLock) {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- result.set(remove(tx, id));
+ addFirst(tx, id, location);
}
});
}
- return result.get();
}
- synchronized public boolean remove(final int position) throws IOException {
+ public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- result.set(remove(tx, position));
+ result.set(remove(tx, id) != null);
}
});
}
return result.get();
}
- synchronized public boolean remove(final PListEntry entry) throws IOException {
+ public boolean remove(final long position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- result.set(doRemove(tx, entry.getEntry()));
+ Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
+ if (iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ result.set(true);
+ } else {
+ result.set(false);
+ }
}
});
}
return result.get();
}
- synchronized public PListEntry get(final int position) throws IOException {
+ public PListEntry get(final long position) throws IOException {
PListEntry result = null;
- final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+ final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- ref.set(get(tx, position));
+ Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
+ ref.set(iterator.next());
}
});
}
if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
+ ByteSequence bs = this.store.getPayload(ref.get().getValue());
+ result = new PListEntry(ref.get().getKey(), bs);
}
return result;
}
- synchronized public PListEntry getFirst() throws IOException {
+ public PListEntry getFirst() throws IOException {
PListEntry result = null;
- final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+ final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getFirst(tx));
}
});
- if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
- }
+ }
+ if (ref.get() != null) {
+ ByteSequence bs = this.store.getPayload(ref.get().getValue());
+ result = new PListEntry(ref.get().getKey(), bs);
}
return result;
}
- synchronized public PListEntry getLast() throws IOException {
+ public PListEntry getLast() throws IOException {
PListEntry result = null;
- final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
+ final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
synchronized (indexLock) {
this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
ref.set(getLast(tx));
}
});
- if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
- }
+ }
+ if (ref.get() != null) {
+ ByteSequence bs = this.store.getPayload(ref.get().getValue());
+ result = new PListEntry(ref.get().getKey(), bs);
}
return result;
}
- synchronized public PListEntry getNext(PListEntry entry) throws IOException {
- PListEntry result = null;
- final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
- if (nextId != EntryLocation.NOT_SET) {
- final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- synchronized (indexLock) {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(getNext(tx, nextId));
- }
- });
- if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
- }
- }
- }
- return result;
+ public boolean isEmpty() {
+ return size() == 0;
}
- synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
- PListEntry result = null;
- final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- synchronized (indexLock) {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
- }
- });
- if (ref.get() != null) {
- result = new PListEntry(ref.get(), entry.getByteSequence());
- }
- }
- return result;
+ synchronized public Iterator<PListEntry> iterator() throws IOException {
+ return new PListIterator();
}
- synchronized public void claimFileLocations(final Set<Integer> candidates) throws IOException {
- synchronized (indexLock) {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- long nextId = rootId;
- while (nextId != EntryLocation.NOT_SET) {
- EntryLocation entry = getNext(tx, nextId);
- if (entry != null) {
- candidates.remove(entry.getLocation().getDataFileId());
- nextId = entry.getNext();
- } else {
- break;
- }
- }
- }
- });
+ private final class PListIterator implements Iterator<PListEntry> {
+ final Iterator<Map.Entry<String, Location>> iterator;
+ final Transaction tx;
+
+ PListIterator() throws IOException {
+ tx = store.pageFile.tx();
+ this.iterator = iterator(tx);
}
- }
- boolean remove(Transaction tx, String id) throws IOException {
- boolean result = false;
- long nextId = this.rootId;
- while (nextId != EntryLocation.NOT_SET) {
- EntryLocation entry = getNext(tx, nextId);
- if (entry != null) {
- if (entry.getId().equals(id)) {
- result = doRemove(tx, entry);
- break;
- }
- nextId = entry.getNext();
- } else {
- // not found
- break;
- }
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
}
- return result;
- }
- boolean remove(Transaction tx, int position) throws IOException {
- boolean result = false;
- long nextId = this.rootId;
- int count = 0;
- while (nextId != EntryLocation.NOT_SET) {
- EntryLocation entry = getNext(tx, nextId);
- if (entry != null) {
- if (count == position) {
- result = doRemove(tx, entry);
- break;
- }
- nextId = entry.getNext();
- } else {
- // not found
- break;
+ @Override
+ public PListEntry next() {
+ Map.Entry<String, Location> entry = iterator.next();
+ ByteSequence bs = null;
+ try {
+ bs = store.getPayload(entry.getValue());
+ } catch (IOException unexpected) {
+ NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
+ e.initCause(unexpected);
+ throw e;
}
- count++;
+ return new PListEntry(entry.getKey(), bs);
}
- return result;
- }
- EntryLocation get(Transaction tx, int position) throws IOException {
- EntryLocation result = null;
- long nextId = this.rootId;
- int count = -1;
- while (nextId != EntryLocation.NOT_SET) {
- EntryLocation entry = getNext(tx, nextId);
- if (entry != null) {
- if (count == position) {
- result = entry;
- break;
+ @Override
+ public void remove() {
+ try {
+ synchronized (indexLock) {
+ tx.execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ iterator.remove();
+ }
+ });
}
- nextId = entry.getNext();
- } else {
- break;
+ } catch (IOException unexpected) {
+ IllegalStateException e = new IllegalStateException(unexpected);
+ e.initCause(unexpected);
+ throw e;
}
- count++;
- }
- return result;
- }
-
- EntryLocation getFirst(Transaction tx) throws IOException {
- long offset = getRoot(tx).getNext();
- if (offset != EntryLocation.NOT_SET) {
- return loadEntry(tx, offset);
- }
- return null;
- }
-
- EntryLocation getLast(Transaction tx) throws IOException {
- if (this.lastId != EntryLocation.NOT_SET) {
- return loadEntry(tx, this.lastId);
}
- return null;
}
- private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
- boolean result = false;
- if (entry != null) {
-
- EntryLocation prev = getPrevious(tx, entry.getPrev());
- EntryLocation next = getNext(tx, entry.getNext());
- long prevId = prev != null ? prev.getPage().getPageId() : this.rootId;
- long nextId = next != null ? next.getPage().getPageId() : EntryLocation.NOT_SET;
-
- if (next != null) {
- next.setPrev(prevId);
- storeEntry(tx, next);
- } else {
- // we are deleting the last one in the list
- this.lastId = prevId;
- }
- if (prev != null) {
- prev.setNext(nextId);
- storeEntry(tx, prev);
+ public void claimFileLocations(final Set<Integer> candidates) throws IOException {
+ synchronized (indexLock) {
+ if (loaded.get()) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
+ while (iterator.hasNext()) {
+ Location location = iterator.next().getValue();
+ candidates.remove(location.getDataFileId());
+ }
+ }
+ });
}
-
- entry.reset();
- storeEntry(tx, entry);
- tx.free(entry.getPage().getPageId());
- result = true;
- this.size--;
- }
- return result;
- }
-
- private EntryLocation createEntry(Transaction tx, String id, long previous, long next) throws IOException {
- Page<EntryLocation> p = tx.allocate();
- EntryLocation result = new EntryLocation();
- result.setPage(p);
- p.set(result);
- result.setId(id);
- result.setPrev(previous);
- result.setNext(next);
- return result;
- }
-
- private EntryLocation createEntry(Page<EntryLocation> p, String id, long previous, long next) throws IOException {
- EntryLocation result = new EntryLocation();
- result.setPage(p);
- p.set(result);
- result.setId(id);
- result.setPrev(previous);
- result.setNext(next);
- return result;
- }
-
- EntryLocation loadEntry(Transaction tx, long pageId) throws IOException {
- Page<EntryLocation> page = tx.load(pageId, EntryLocationMarshaller.INSTANCE);
- EntryLocation entry = page.get();
- if (entry != null) {
- entry.setPage(page);
- }
- return entry;
- }
-
- private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
- tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
- }
-
- EntryLocation getNext(Transaction tx, long next) throws IOException {
- EntryLocation result = null;
- if (next != EntryLocation.NOT_SET) {
- result = loadEntry(tx, next);
}
- return result;
- }
-
- private EntryLocation getPrevious(Transaction tx, long previous) throws IOException {
- EntryLocation result = null;
- if (previous != EntryLocation.NOT_SET) {
- result = loadEntry(tx, previous);
- }
- return result;
- }
-
- private EntryLocation getRoot(Transaction tx) throws IOException {
- EntryLocation result = loadEntry(tx, this.rootId);
- return result;
}
- ByteSequence getPayload(EntryLocation entry) throws IOException {
- return this.store.getPayload(entry.getLocation());
+ @Override
+ public String toString() {
+ return "" + name + ",[headPageId=" + headPageId + ",tailPageId=" + tailPageId + ", size=" + size() + "]";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListEntry.java Thu Jun 2 15:28:30 2011
@@ -21,39 +21,22 @@ import org.apache.kahadb.util.ByteSequen
public class PListEntry {
private final ByteSequence byteSequence;
- private final EntryLocation entry;
+ private final String entry;
- PListEntry(EntryLocation entry, ByteSequence bs) {
+ PListEntry(String entry, ByteSequence bs) {
this.entry = entry;
this.byteSequence = bs;
}
- /**
- * @return the byteSequence
- */
public ByteSequence getByteSequence() {
return this.byteSequence;
}
public String getId() {
- return this.entry.getId();
- }
-
- /**
- * @return the entry
- */
- EntryLocation getEntry() {
return this.entry;
}
public PListEntry copy() {
return new PListEntry(this.entry, this.byteSequence);
}
-
- @Override
- public String toString() {
- return this.entry.getId() + "[pageId=" + this.entry.getPage().getPageId() + ",next=" + this.entry.getNext()
- + "]";
- }
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Thu Jun 2 15:28:30 2011
@@ -73,6 +73,10 @@ public class PListStore extends ServiceS
private Scheduler scheduler;
private long cleanupInterval = 30000;
+ private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
+ private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
+ private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+
public Object getIndexLock() {
return indexLock;
}
@@ -82,6 +86,30 @@ public class PListStore extends ServiceS
this.scheduler = brokerService.getScheduler();
}
+ public int getIndexPageSize() {
+ return indexPageSize;
+ }
+
+ public int getIndexCacheSize() {
+ return indexCacheSize;
+ }
+
+ public int getIndexWriteBatchSize() {
+ return indexWriteBatchSize;
+ }
+
+ public void setIndexPageSize(int indexPageSize) {
+ this.indexPageSize = indexPageSize;
+ }
+
+ public void setIndexCacheSize(int indexCacheSize) {
+ this.indexCacheSize = indexCacheSize;
+ }
+
+ public void setIndexWriteBatchSize(int indexWriteBatchSize) {
+ this.indexWriteBatchSize = indexWriteBatchSize;
+ }
+
protected class MetaData {
protected MetaData(PListStore store) {
this.store = store;
@@ -89,34 +117,34 @@ public class PListStore extends ServiceS
private final PListStore store;
Page<MetaData> page;
- BTreeIndex<String, PList> storedSchedulers;
+ BTreeIndex<String, PList> lists;
void createIndexes(Transaction tx) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
+ this.lists = new BTreeIndex<String, PList>(pageFile, tx.allocate().getPageId());
}
void load(Transaction tx) throws IOException {
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.storedSchedulers.load(tx);
+ this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
+ this.lists.setValueMarshaller(new PListMarshaller(this.store));
+ this.lists.load(tx);
}
- void loadLists(Transaction tx, Map<String, PList> schedulers) throws IOException {
- for (Iterator<Entry<String, PList>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
+ void loadLists(Transaction tx, Map<String, PList> lists) throws IOException {
+ for (Iterator<Entry<String, PList>> i = this.lists.iterator(tx); i.hasNext();) {
Entry<String, PList> entry = i.next();
entry.getValue().load(tx);
- schedulers.put(entry.getKey(), entry.getValue());
+ lists.put(entry.getKey(), entry.getValue());
}
}
public void read(DataInput is) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, PList>(pageFile, is.readLong());
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+ this.lists = new BTreeIndex<String, PList>(pageFile, is.readLong());
+ this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
+ this.lists.setValueMarshaller(new PListMarshaller(this.store));
}
public void write(DataOutput os) throws IOException {
- os.writeLong(this.storedSchedulers.getPageId());
+ os.writeLong(this.lists.getPageId());
}
}
@@ -137,29 +165,9 @@ public class PListStore extends ServiceS
}
}
- class ValueMarshaller extends VariableMarshaller<List<EntryLocation>> {
- public List<EntryLocation> readPayload(DataInput dataIn) throws IOException {
- List<EntryLocation> result = new ArrayList<EntryLocation>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- EntryLocation jobLocation = new EntryLocation();
- jobLocation.readExternal(dataIn);
- result.add(jobLocation);
- }
- return result;
- }
-
- public void writePayload(List<EntryLocation> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (EntryLocation jobLocation : value) {
- jobLocation.writeExternal(dataOut);
- }
- }
- }
-
- class JobSchedulerMarshaller extends VariableMarshaller<PList> {
+ class PListMarshaller extends VariableMarshaller<PList> {
private final PListStore store;
- JobSchedulerMarshaller(PListStore store) {
+ PListMarshaller(PListStore store) {
this.store = store;
}
public PList readPayload(DataInput dataIn) throws IOException {
@@ -168,8 +176,8 @@ public class PListStore extends ServiceS
return result;
}
- public void writePayload(PList js, DataOutput dataOut) throws IOException {
- js.write(dataOut);
+ public void writePayload(PList list, DataOutput dataOut) throws IOException {
+ list.write(dataOut);
}
}
@@ -207,9 +215,9 @@ public class PListStore extends ServiceS
pl.setName(name);
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- pl.setRootId(tx.allocate().getPageId());
+ pl.setHeadPageId(tx.allocate().getPageId());
pl.load(tx);
- metaData.storedSchedulers.put(tx, name, pl);
+ metaData.lists.put(tx, name, pl);
}
});
result = pl;
@@ -236,8 +244,8 @@ public class PListStore extends ServiceS
if (result) {
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- metaData.storedSchedulers.remove(tx, name);
- pl.destroy(tx);
+ metaData.lists.remove(tx, name);
+ pl.destroy();
}
});
}
@@ -261,6 +269,9 @@ public class PListStore extends ServiceS
this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
this.journal.start();
this.pageFile = new PageFile(directory, "tmpDB");
+ this.pageFile.setPageSize(getIndexPageSize());
+ this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
+ this.pageFile.setPageCacheSize(getIndexCacheSize());
this.pageFile.load();
this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
@@ -310,7 +321,7 @@ public class PListStore extends ServiceS
}
}
for (PList pl : this.persistentLists.values()) {
- pl.unload();
+ pl.unload(null);
}
if (this.pageFile != null) {
this.pageFile.unload();
@@ -351,20 +362,13 @@ public class PListStore extends ServiceS
}
}
- private void claimCandidates(PListEntry entry, Set<Integer> candidates) {
- EntryLocation location = entry.getEntry();
- if (location != null) {
- candidates.remove(location.getLocation().getDataFileId());
- }
- }
-
- synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+ ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
ByteSequence result = null;
result = this.journal.read(location);
return result;
}
- synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
+ Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
return this.journal.write(payload, sync);
}
@@ -440,7 +444,8 @@ public class PListStore extends ServiceS
@Override
public String toString() {
- return "PListStore:" + this.directory;
+ String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
+ return "PListStore:[" + path + " ]";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Thu Jun 2 15:28:30 2011
@@ -16,13 +16,18 @@
*/
package org.apache.activemq.transport.stomp;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Map;
+import javax.net.ServerSocketFactory;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.xbean.XBeanBrokerService;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java Thu Jun 2 15:28:30 2011
@@ -284,7 +284,10 @@ public abstract class Usage<T extends Us
@Override
public String toString() {
- return "Usage(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + retrieveUsage() + " limit=" + limiter.getLimit() + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
+ return "Usage(" + getName() + ") percentUsage=" + percentUsage
+ + "%, usage=" + retrieveUsage() + ", limit=" + limiter.getLimit()
+ + ", percentUsageMinDelta=" + percentUsageMinDelta + "%"
+ + (parent != null ? ";Parent:" + parent.toString() : "");
}
@SuppressWarnings("unchecked")
Added: activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java?rev=1130607&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java Thu Jun 2 15:28:30 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.kahadb.journal.Location;
+
+public class LocationMarshaller implements Marshaller<Location> {
+ public final static LocationMarshaller INSTANCE = new LocationMarshaller();
+
+ public Location readPayload(DataInput dataIn) throws IOException {
+ Location rc = new Location();
+ rc.setDataFileId(dataIn.readInt());
+ rc.setOffset(dataIn.readInt());
+ return rc;
+ }
+
+ public void writePayload(Location object, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(object.getDataFileId());
+ dataOut.writeInt(object.getOffset());
+ }
+
+ public int getFixedSize() {
+ return 8;
+ }
+
+ public Location deepCopy(Location source) {
+ return new Location(source);
+ }
+
+ public boolean isDeepCopySupported() {
+ return true;
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/kahadb/util/LocationMarshaller.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java Thu Jun 2 15:28:30 2011
@@ -184,6 +184,18 @@ public class XARecoveryBrokerTest extend
DataArrayResponse dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
+ // ensure we can close a connection with prepared transactions
+ connection.request(closeConnectionInfo(connectionInfo));
+
+ // open again to deliver outcome
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ connection.send(consumerInfo);
+
// Commit the prepared transactions.
for (int i = 0; i < dar.getData().length; i++) {
connection.send(createCommitTransaction2Phase(connectionInfo, (TransactionId)dar.getData()[i]));
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTest.java Thu Jun 2 15:28:30 2011
@@ -17,23 +17,42 @@
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.IndirectMessageReference;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.kahadb.plist.PList;
import org.apache.activemq.usage.SystemUsage;
+import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.util.ByteSequence;
-import org.junit.Before;
+import org.junit.After;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class FilePendingMessageCursorTest {
-
+ private static final Logger LOG = LoggerFactory.getLogger(FilePendingMessageCursorTest.class);
BrokerService brokerService;
FilePendingMessageCursor underTest;
- @Before
- public void createBrokerWithTempStoreLimit() throws Exception {
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.getTempDataStore().stop();
+ }
+ }
+
+ private void createBrokerWithTempStoreLimit() throws Exception {
brokerService = new BrokerService();
SystemUsage usage = brokerService.getSystemUsage();
usage.getTempUsage().setLimit(1025*1024*15);
@@ -45,7 +64,7 @@ public class FilePendingMessageCursorTes
@Test
public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
-
+ createBrokerWithTempStoreLimit();
SystemUsage usage = brokerService.getSystemUsage();
assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
@@ -57,4 +76,60 @@ public class FilePendingMessageCursorTes
assertFalse("cursor is not full", underTest.isFull());
}
+
+ @Test
+ public void testAddRemoveAddIndexSize() throws Exception {
+ brokerService = new BrokerService();
+ SystemUsage usage = brokerService.getSystemUsage();
+ usage.getMemoryUsage().setLimit(1024*150);
+ String body = new String(new byte[1024]);
+ Destination destination = new Queue(brokerService, new ActiveMQQueue("Q"), null, new DestinationStatistics(), null);
+
+ underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
+ underTest.setSystemUsage(usage);
+
+ LOG.info("start");
+ final PageFile pageFile = underTest.getDiskList().getPageFile();
+ LOG.info("page count: " +pageFile.getPageCount());
+ LOG.info("free count: " + pageFile.getFreePageCount());
+ LOG.info("content size: " +pageFile.getPageContentSize());
+
+ final long initialPageCount = pageFile.getPageCount();
+
+ final int numMessages = 1000;
+
+ for (int j=0; j<10; j++) {
+ // ensure free pages are reused
+ for (int i=0; i< numMessages; i++) {
+ ActiveMQMessage mqMessage = new ActiveMQMessage();
+ mqMessage.setStringProperty("body", body);
+ mqMessage.setMessageId(new MessageId("1:2:3:" + i));
+ mqMessage.setMemoryUsage(usage.getMemoryUsage());
+ mqMessage.setRegionDestination(destination);
+ underTest.addMessageLast(new IndirectMessageReference(mqMessage));
+ }
+ assertFalse("cursor is not full " + usage.getTempUsage(), underTest.isFull());
+
+ underTest.reset();
+ long receivedCount = 0;
+ while(underTest.hasNext()) {
+ MessageReference ref = underTest.next();
+ underTest.remove();
+ assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId());
+ }
+ assertEquals("got all messages back", receivedCount, numMessages);
+ LOG.info("page count: " +pageFile.getPageCount());
+ LOG.info("free count: " + pageFile.getFreePageCount());
+ LOG.info("content size: " + pageFile.getPageContentSize());
+ }
+
+ assertEquals("expected page usage", initialPageCount, pageFile.getPageCount() - pageFile.getFreePageCount() );
+
+ LOG.info("Destroy");
+ underTest.destroy();
+ LOG.info("page count: " + pageFile.getPageCount());
+ LOG.info("free count: " + pageFile.getFreePageCount());
+ LOG.info("content size: " + pageFile.getPageContentSize());
+ assertEquals("expected page usage", initialPageCount -1, pageFile.getPageCount() - pageFile.getFreePageCount() );
+ }
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java Thu Jun 2 15:28:30 2011
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.IOException;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Vector;
@@ -80,7 +81,7 @@ public class PListTest {
plist.addFirst(test, bs);
}
assertEquals(plist.size(), COUNT);
- int count = plist.size() - 1;
+ long count = plist.size() - 1;
for (ByteSequence bs : map.values()) {
String origStr = new String(bs.getData(), bs.getOffset(), bs.getLength());
PListEntry entry = plist.get(count);
@@ -107,7 +108,7 @@ public class PListTest {
assertEquals(plist.size(), COUNT);
PListEntry entry = plist.getFirst();
while (entry != null) {
- plist.remove(entry.copy());
+ plist.remove(entry.getId());
entry = plist.getFirst();
}
assertEquals(0,plist.size());
@@ -133,7 +134,6 @@ public class PListTest {
}
plist.destroy();
assertEquals(0,plist.size());
- assertNull("no first entry", plist.getFirst());
}
@Test
@@ -292,47 +292,56 @@ public class PListTest {
store.setCleanupInterval(5000);
store.start();
- final int iterations = 500;
+ final int iterations = 5000;
final int numLists = 10;
// prime the store
// create/delete
+ LOG.info("create");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.CREATE, iterations).run();
}
+ LOG.info("delete");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.DELETE, iterations).run();
}
- // fill
+ LOG.info("fill");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
- // empty
+ LOG.info("remove");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.REMOVE, iterations).run();
}
- // empty
+
+ LOG.info("check empty");
+ for (int i=0; i<numLists;i++) {
+ assertEquals("empty " + i, 0, store.getPList("List-" + i).size());
+ }
+
+ LOG.info("delete again");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.DELETE, iterations).run();
}
- // fill
+ LOG.info("fill again");
for (int i=0; i<numLists;i++) {
new Job(i, PListTest.TaskType.ADD, iterations).run();
}
- // parallel
- ExecutorService executor = Executors.newFixedThreadPool(100);
+ LOG.info("parallel add and remove");
+ ExecutorService executor = Executors.newFixedThreadPool(numLists*2);
for (int i=0; i<numLists*2; i++) {
executor.execute(new Job(i, i>=numLists ? PListTest.TaskType.ADD : PListTest.TaskType.REMOVE, iterations));
}
executor.shutdown();
+ LOG.info("wait for parallel work to complete");
executor.awaitTermination(60*5, TimeUnit.SECONDS);
- assertTrue("no excepitons", exceptions.isEmpty());
+ assertTrue("no exceptions", exceptions.isEmpty());
}
enum TaskType {CREATE, DELETE, ADD, REMOVE, ITERATE}
@@ -373,7 +382,7 @@ public class PListTest {
case REMOVE:
plist = store.getPList("List-" + id);
- for (int j = iterations; j > 0; j--) {
+ for (int j = iterations -1; j >= 0; j--) {
plist.remove(idSeed + "id" + j);
if (j > 0 && j % (iterations / 2) == 0) {
LOG.info("Job-" + id + " Done remove: " + j);
@@ -383,9 +392,10 @@ public class PListTest {
case ITERATE:
plist = store.getPList("List-" + id);
- PListEntry element = plist.getFirst();
- while (element != null) {
- element = plist.getNext(element);
+ Iterator<PListEntry> iterator = plist.iterator();
+ PListEntry element = null;
+ while (iterator.hasNext()) {
+ element = iterator.next();
}
break;
default:
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java?rev=1130607&r1=1130606&r2=1130607&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/ListIndex.java Thu Jun 2 15:28:30 2011
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ public class ListIndex<Key,Value> implem
protected PageFile pageFile;
protected long headPageId;
protected long tailPageId;
- private long size;
+ private AtomicLong size = new AtomicLong(0);
protected AtomicBoolean loaded = new AtomicBoolean();
@@ -43,9 +44,12 @@ public class ListIndex<Key,Value> implem
private Marshaller<Key> keyMarshaller;
private Marshaller<Value> valueMarshaller;
- public ListIndex(PageFile pageFile, long rootPageId) {
+ public ListIndex() {
+ }
+
+ public ListIndex(PageFile pageFile, long headPageId) {
this.pageFile = pageFile;
- this.headPageId = rootPageId;
+ this.headPageId = headPageId;
}
synchronized public void load(Transaction tx) throws IOException {
@@ -61,15 +65,15 @@ public class ListIndex<Key,Value> implem
final Page<ListNode<Key,Value>> p = tx.load(headPageId, null);
if( p.getType() == Page.PAGE_FREE_TYPE ) {
// Need to initialize it..
- ListNode<Key, Value> root = createNode(p, null);
+ ListNode<Key, Value> root = createNode(p);
storeNode(tx, root, true);
- tailPageId = headPageId;
+ tailPageId = headPageId = p.getPageId();
} else {
- ListNode<Key, Value> node = loadNode(tx, headPageId, null);
- size += node.size(tx);
+ ListNode<Key, Value> node = loadNode(tx, headPageId);
+ size.addAndGet(node.size(tx));
while (node.getNext() != -1) {
- node = loadNode(tx, node.getNext(), node);
- size += node.size(tx);
+ node = loadNode(tx, node.getNext());
+ size.addAndGet(node.size(tx));
tailPageId = node.getPageId();
}
}
@@ -82,11 +86,11 @@ public class ListIndex<Key,Value> implem
}
protected ListNode<Key,Value> getHead(Transaction tx) throws IOException {
- return loadNode(tx, headPageId, null);
+ return loadNode(tx, headPageId);
}
protected ListNode<Key,Value> getTail(Transaction tx) throws IOException {
- return loadNode(tx, tailPageId, null);
+ return loadNode(tx, tailPageId);
}
synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
@@ -122,14 +126,14 @@ public class ListIndex<Key,Value> implem
synchronized public Value add(Transaction tx, Key key, Value value) throws IOException {
assertLoaded();
getTail(tx).put(tx, key, value);
- size ++;
+ size.incrementAndGet();
return null;
}
synchronized public Value addFirst(Transaction tx, Key key, Value value) throws IOException {
assertLoaded();
getHead(tx).addFirst(tx, key, value);
- size++;
+ size.incrementAndGet();
return null;
}
@@ -146,7 +150,7 @@ public class ListIndex<Key,Value> implem
}
public void onRemove() {
- size--;
+ size.decrementAndGet();
}
public boolean isTransient() {
@@ -157,8 +161,10 @@ public class ListIndex<Key,Value> implem
for (Iterator<ListNode<Key,Value>> iterator = listNodeIterator(tx); iterator.hasNext(); ) {
ListNode<Key,Value>candidate = iterator.next();
candidate.clear(tx);
+ // break up the transaction
+ tx.commit();
}
- size = 0;
+ size.set(0);
}
synchronized public Iterator<ListNode<Key, Value>> listNodeIterator(Transaction tx) throws IOException {
@@ -173,7 +179,7 @@ public class ListIndex<Key,Value> implem
return getHead(tx).iterator(tx);
}
- synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, int initialPosition) throws IOException {
+ synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, long initialPosition) throws IOException {
return getHead(tx).iterator(tx, initialPosition);
}
@@ -191,29 +197,24 @@ public class ListIndex<Key,Value> implem
}
}
- ListNode<Key,Value> loadNode(Transaction tx, long pageId, ListNode<Key,Value> parent) throws IOException {
+ ListNode<Key,Value> loadNode(Transaction tx, long pageId) throws IOException {
Page<ListNode<Key,Value>> page = tx.load(pageId, marshaller);
ListNode<Key, Value> node = page.get();
node.setPage(page);
- node.setParent(parent);
return node;
}
- ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> p, ListNode<Key,Value> parent) throws IOException {
+ ListNode<Key,Value> createNode(Page<ListNode<Key,Value>> page) throws IOException {
ListNode<Key,Value> node = new ListNode<Key,Value>(this);
- node.setPage(p);
- node.setParent(parent);
- node.setEmpty();
- p.set(node);
+ node.setPage(page);
+ page.set(node);
return node;
}
- ListNode<Key,Value> createNode(Transaction tx, ListNode<Key,Value> parent) throws IOException {
- Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), marshaller);
+ ListNode<Key,Value> createNode(Transaction tx) throws IOException {
+ Page<ListNode<Key,Value>> page = tx.load(tx.<Object>allocate(1).getPageId(), null);
ListNode<Key,Value> node = new ListNode<Key,Value>(this);
node.setPage(page);
- node.setParent(parent);
- node.setEmpty();
page.set(node);
return node;
}
@@ -225,6 +226,11 @@ public class ListIndex<Key,Value> implem
public PageFile getPageFile() {
return pageFile;
}
+
+ public void setPageFile(PageFile pageFile) {
+ this.pageFile = pageFile;
+ }
+
public long getHeadPageId() {
return headPageId;
}
@@ -252,6 +258,6 @@ public class ListIndex<Key,Value> implem
}
public long size() {
- return size;
+ return size.get();
}
}