You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/10/15 21:31:24 UTC
svn commit: r584863 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
kaha/impl/async/ store/ store/amq/ store/kahadaptor/
Author: rajdavies
Date: Mon Oct 15 12:31:22 2007
New Revision: 584863
URL: http://svn.apache.org/viewvc?rev=584863&view=rev
Log:
fix data logs not being correctly removed
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java Mon Oct 15 12:31:22 2007
@@ -270,6 +270,12 @@
storeSize.addAndGet(size);
return currentWriteFile;
}
+
+ public synchronized void removeLocation(Location location) throws IOException{
+
+ DataFile dataFile = getDataFile(location);
+ dataFile.decrement();
+ }
DataFile getDataFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
@@ -346,6 +352,7 @@
synchronized void addInterestInFile(DataFile dataFile) {
if (dataFile != null) {
dataFile.increment();
+ System.err.println("ADD INTEREST: " + dataFile);
}
}
@@ -355,6 +362,7 @@
DataFile dataFile = (DataFile)fileMap.get(key);
removeInterestInFile(dataFile);
}
+
}
synchronized void removeInterestInFile(DataFile dataFile) throws IOException {
@@ -362,24 +370,20 @@
if (dataFile.decrement() <= 0) {
removeDataFile(dataFile);
}
+ System.err.println("REMOVE INTEREST: " + dataFile);
}
}
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse) throws IOException {
-
- // Substract and the difference is the set of files that are no longer
- // needed :)
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
-
List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) {
DataFile dataFile = (DataFile)fileMap.get(key);
purgeList.add(dataFile);
}
-
for (DataFile dataFile : purgeList) {
- removeDataFile(dataFile);
+ forceRemoveDataFile(dataFile);
}
}
@@ -399,16 +403,20 @@
// Make sure we don't delete too much data.
if (dataFile == currentWriteFile || mark == null || dataFile.getDataFileId() >= mark.getDataFileId()) {
- return;
+ LOG.debug("Won't remove DataFile" + dataFile);
+ return;
}
-
+ forceRemoveDataFile(dataFile);
+ }
+
+ private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
-
- fileMap.remove(dataFile.getDataFileId());
+ DataFile removed = fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
boolean result = dataFile.delete();
- LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
+ LOG.debug("discarding data file " + dataFile
+ + (result ? "successful " : "failed"));
}
@@ -519,7 +527,8 @@
}
public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
- return appender.storeItem(data, Location.USER_TYPE, sync);
+ Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
+ return loc;
}
public synchronized Location write(ByteSequence data, byte type, boolean sync) throws IOException, IllegalStateException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java Mon Oct 15 12:31:22 2007
@@ -66,6 +66,10 @@
public synchronized int decrement() {
return --referenceCount;
}
+
+ public synchronized int getReferenceCount(){
+ return referenceCount;
+ }
public synchronized boolean isUnused() {
return referenceCount <= 0;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAccessorPool.java Mon Oct 15 12:31:22 2007
@@ -59,9 +59,8 @@
return rc;
}
- public void closeDataFileReader(DataFileAccessor reader) {
+ public synchronized void closeDataFileReader(DataFileAccessor reader) {
openCounter--;
- used = true;
if (pool.size() >= maxOpenReadersPerFile || disposed) {
reader.dispose();
} else {
@@ -69,15 +68,15 @@
}
}
- public void clearUsedMark() {
+ public synchronized void clearUsedMark() {
used = false;
}
- public boolean isUsed() {
+ public synchronized boolean isUsed() {
return used;
}
- public void dispose() {
+ public synchronized void dispose() {
for (DataFileAccessor reader : pool) {
reader.dispose();
}
@@ -85,7 +84,7 @@
disposed = true;
}
- public int getOpenCounter() {
+ public synchronized int getOpenCounter() {
return openCounter;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java Mon Oct 15 12:31:22 2007
@@ -17,9 +17,10 @@
package org.apache.activemq.store;
import java.io.IOException;
+
import javax.jms.JMSException;
+
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicReferenceStore.java Mon Oct 15 12:31:22 2007
@@ -31,17 +31,20 @@
*/
public interface TopicReferenceStore extends ReferenceStore, TopicMessageStore {
/**
- * Stores the last acknowledged messgeID for the given subscription so that
+ * Removes the last acknowledged messgeID for the given subscription so that
* we can recover and commence dispatching messages from the last checkpoint
+ * N.B. - all messages previous to this one for a given subscriber
+ * should also be acknowledged
*
* @param context
* @param clientId
* @param subscriptionName
* @param messageId
* @param subscriptionPersistentId
+ * @return true if there are no more references to the message - or the message is null
* @throws IOException
*/
- void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
+ boolean acknowledgeReference(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException;
/**
* @param clientId
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Mon Oct 15 12:31:22 2007
@@ -180,7 +180,7 @@
/**
*/
- public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
+ public void removeMessage(final ConnectionContext context, final MessageAck ack) throws IOException {
JournalQueueAck remove = new JournalQueueAck();
remove.setDestination(destination);
remove.setMessageAck(ack);
@@ -189,7 +189,7 @@
if (debug) {
LOG.debug("Journalled message remove for: " + ack.getLastMessageId() + ", at: " + location);
}
- removeMessage(ack, location);
+ removeMessage(ack,location);
} else {
if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
@@ -206,7 +206,7 @@
}
synchronized (AMQMessageStore.this) {
inFlightTxLocations.remove(location);
- removeMessage(ack, location);
+ removeMessage(ack,location);
}
}
@@ -240,7 +240,7 @@
}
}
}
-
+
public boolean replayRemoveMessage(ConnectionContext context, MessageAck messageAck) {
try {
// Only remove the message if it has not already been removed.
@@ -378,16 +378,28 @@
*
*/
public Message getMessage(MessageId identity) throws IOException {
+ Location location = getLocation(identity);
+ DataStructure rc = peristenceAdapter.readCommand(location);
+ try {
+ return (Message) rc;
+ } catch (ClassCastException e) {
+ throw new IOException("Could not read message " + identity
+ + " at location " + location
+ + ", expected a message, but got: " + rc);
+ }
+ }
+
+ protected Location getLocation(MessageId messageId) throws IOException {
ReferenceData data = null;
synchronized (this) {
// Is it still in flight???
- data = messages.get(identity);
+ data = messages.get(messageId);
if (data == null && cpAddedMessageIds != null) {
- data = cpAddedMessageIds.get(identity);
+ data = cpAddedMessageIds.get(messageId);
}
}
if (data == null) {
- data = referenceStore.getMessageReference(identity);
+ data = referenceStore.getMessageReference(messageId);
if (data == null) {
return null;
}
@@ -395,12 +407,7 @@
Location location = new Location();
location.setDataFileId(data.getFileId());
location.setOffset(data.getOffset());
- DataStructure rc = peristenceAdapter.readCommand(location);
- try {
- return (Message)rc;
- } catch (ClassCastException e) {
- throw new IOException("Could not read message " + identity + " at location " + location + ", expected a message, but got: " + rc);
- }
+ return location;
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Mon Oct 15 12:31:22 2007
@@ -83,7 +83,7 @@
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
- private long cleanupInterval = 1000 * 60;
+ private long cleanupInterval = 1000 * 15;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
private int maxCheckpointMessageAddSize = 1024 * 4;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Mon Oct 15 12:31:22 2007
@@ -25,6 +25,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.impl.async.Location;
@@ -77,7 +78,7 @@
/**
*/
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+ public void acknowledge(final ConnectionContext context, final String clientId, final String subscriptionName, final MessageId messageId) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
@@ -92,7 +93,7 @@
if (debug) {
LOG.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
}
- acknowledge(messageId, location, key);
+ acknowledge(context,messageId, location, clientId,subscriptionName);
} else {
if (debug) {
LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
@@ -109,7 +110,7 @@
}
synchronized (AMQTopicMessageStore.this) {
inFlightTxLocations.remove(location);
- acknowledge(messageId, location, key);
+ acknowledge(context,messageId, location, clientId,subscriptionName);
}
}
@@ -142,44 +143,22 @@
* @param messageId
* @param location
* @param key
- * @throws InterruptedIOException
+ * @throws IOException
*/
- protected void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException {
+ protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException {
synchronized (this) {
lastLocation = location;
- ackedLastAckLocations.put(key, messageId);
+ if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){
+ MessageAck ack = new MessageAck();
+ ack.setLastMessageId(messageId);
+ removeMessage(context, ack);
+ }
}
try {
asyncWriteTask.wakeup();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
- }
-
- @Override
- protected Location doAsyncWrite() throws IOException {
- final Map<SubscriptionKey, MessageId> cpAckedLastAckLocations;
- // swap out the hash maps..
- synchronized (this) {
- cpAckedLastAckLocations = this.ackedLastAckLocations;
- this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
- }
- Location location = super.doAsyncWrite();
-
- if (cpAckedLastAckLocations != null) {
- transactionTemplate.run(new Callback() {
- public void execute() throws Exception {
- // Checkpoint the acknowledged messages.
- Iterator<SubscriptionKey> iterator = cpAckedLastAckLocations.keySet().iterator();
- while (iterator.hasNext()) {
- SubscriptionKey subscriptionKey = iterator.next();
- MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
- topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
- }
- }
- });
- }
- return location;
}
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Mon Oct 15 12:31:22 2007
@@ -26,6 +26,7 @@
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
+import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
@@ -120,7 +121,7 @@
}
return result.getData();
}
-
+
public void addReferenceFileIdsInUse() {
for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
.getNext(entry)) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java Mon Oct 15 12:31:22 2007
@@ -117,12 +117,55 @@
return container;
}
- public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId) throws IOException {
+ public synchronized boolean acknowledgeReference(ConnectionContext context,
+ String clientId, String subscriptionName, MessageId messageId)
+ throws IOException {
+ boolean removeMessage = false;
String key = getSubscriptionKey(clientId, subscriptionName);
TopicSubContainer container = subscriberMessages.get(key);
if (container != null) {
+ ConsumerMessageRef ref = null;
+ if((ref = container.remove(messageId)) != null) {
+ TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+ if (tsa != null) {
+ if (tsa.decrementCount() <= 0) {
+ StoreEntry entry = ref.getAckEntry();
+ entry = ackContainer.refresh(entry);
+ ackContainer.remove(entry);
+ ReferenceRecord rr = messageContainer.get(messageId);
+ if (rr != null) {
+ entry = tsa.getMessageEntry();
+ entry = messageContainer.refresh(entry);
+ messageContainer.remove(entry);
+ removeInterest(rr);
+ removeMessage = true;
+ }else {
+ System.err.println("REF RTEC OS NULL!!!");
+ }
+ } else {
+ System.out.println("RED XOUVT IAS " + tsa.getCount());
+ ackContainer.update(ref.getAckEntry(), tsa);
+ }
+ }else{
+ System.err.println("NO TAS!!!");
+ }
+ }else{
+ //no message held
+ removeMessage = true;
+ }
+ }
+ return removeMessage;
+
+ }
+
+ public synchronized void acknowledge(ConnectionContext context,
+ String clientId, String subscriptionName, MessageId messageId)
+ throws IOException {
+ String key = getSubscriptionKey(clientId, subscriptionName);
+
+ TopicSubContainer container = subscriberMessages.get(key);
+ if (container != null) {
ConsumerMessageRef ref = container.remove(messageId);
if (ref != null) {
TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
@@ -145,7 +188,7 @@
}
}
}
- }
+ }
public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java?rev=584863&r1=584862&r2=584863&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java Mon Oct 15 12:31:22 2007
@@ -81,6 +81,22 @@
}
return result;
}
+
+ public ConsumerMessageRef removeFirst() {
+ ConsumerMessageRef result = null;
+ if (!listContainer.isEmpty()) {
+ StoreEntry entry = listContainer.getFirst();
+
+ result = (ConsumerMessageRef) listContainer.get(entry);
+ listContainer.remove(entry);
+ if (listContainer != null && batchEntry != null
+ && (listContainer.isEmpty() || batchEntry.equals(entry))) {
+ reset();
+ }
+
+ }
+ return result;
+ }
public ConsumerMessageRef get(StoreEntry entry) {
return (ConsumerMessageRef)listContainer.get(entry);