You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/16 19:30:26 UTC
svn commit: r964866 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/store/
main/java/org/apache/activemq/store/amq/
main/java/org/apache/activemq/store/kahadaptor/
test/java/org/apache/activemq/...
Author: gtully
Date: Fri Jul 16 17:30:25 2010
New Revision: 964866
URL: http://svn.apache.org/viewvc?rev=964866&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2832 - impl for AMQ pa
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java?rev=964866&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java Fri Jul 16 17:30:25 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.kaha.impl.async.Location;
+
+public final class MessageAckWithLocation extends MessageAck {
+ public final Location location;
+
+ public MessageAckWithLocation(MessageAck ack, Location location) {
+ ack.copy(this);
+ this.location = location;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/MessageAckWithLocation.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java Fri Jul 16 17:30:25 2010
@@ -47,7 +47,6 @@ public interface MessageStore extends Se
*
* @param context context
* @param message
- * @param l
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
@@ -59,7 +58,6 @@ public interface MessageStore extends Se
*
* @param context context
* @param message
- * @param l
* @return a Future to track when this is complete
* @throws IOException
* @throws IOException
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Fri Jul 16 17:30:25 2010
@@ -38,6 +38,7 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import org.apache.activemq.kaha.MessageAckWithLocation;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -70,7 +71,7 @@ public class AMQMessageStore extends Abs
protected final TaskRunner asyncWriteTask;
protected CountDownLatch flushLatch;
private Map<MessageId, ReferenceData> messages = new LinkedHashMap<MessageId, ReferenceData>();
- private List<MessageAck> messageAcks = new ArrayList<MessageAck>();
+ private List<MessageAckWithLocation> messageAcks = new ArrayList<MessageAckWithLocation>();
/** A MessageStore that we can use to retrieve messages quickly. */
private Map<MessageId, ReferenceData> cpAddedMessageIds;
private final boolean debug = LOG.isDebugEnabled();
@@ -255,7 +256,7 @@ public class AMQMessageStore extends Abs
MessageId id = ack.getLastMessageId();
data = messages.remove(id);
if (data == null) {
- messageAcks.add(ack);
+ messageAcks.add(new MessageAckWithLocation(ack, location));
} else {
// message never got written so datafileReference will still exist
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, data.getFileId());
@@ -350,7 +351,7 @@ public class AMQMessageStore extends Abs
* @throws IOException
*/
protected Location doAsyncWrite() throws IOException {
- final List<MessageAck> cpRemovedMessageLocations;
+ final List<MessageAckWithLocation> cpRemovedMessageLocations;
final List<Location> cpActiveJournalLocations;
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation;
@@ -361,7 +362,7 @@ public class AMQMessageStore extends Abs
cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>();
- this.messageAcks = new ArrayList<MessageAck>();
+ this.messageAcks = new ArrayList<MessageAckWithLocation>();
lastLocation = this.lastLocation;
} finally {
lock.unlock();
@@ -406,7 +407,7 @@ public class AMQMessageStore extends Abs
persitanceAdapter.commitTransaction(context);
persitanceAdapter.beginTransaction(context);
// Checkpoint the removed messages.
- for (MessageAck ack : cpRemovedMessageLocations) {
+ for (MessageAckWithLocation ack : cpRemovedMessageLocations) {
try {
referenceStore.removeMessage(transactionTemplate.getContext(), ack);
} catch (Throwable e) {
@@ -576,5 +577,5 @@ public class AMQMessageStore extends Abs
}
getReferenceStore().setBatch(messageId);
}
-
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Fri Jul 16 17:30:25 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -570,7 +571,6 @@ public class AMQPersistenceAdapter imple
*
* @throws IOException
* @throws IOException
- * @throws InvalidLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
@@ -1051,7 +1051,6 @@ public class AMQPersistenceAdapter imple
}
-
protected void lock() throws Exception {
lockLogged = false;
lockAquired = false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java Fri Jul 16 17:30:25 2010
@@ -29,6 +29,7 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
+import org.apache.activemq.kaha.MessageAckWithLocation;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.AbstractMessageStore;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -203,17 +204,17 @@ public class KahaReferenceStore extends
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
- removeMessage(ack.getLastMessageId());
- }
-
- public void removeMessage(MessageId msgId) throws IOException {
lock.lock();
try {
+ MessageId msgId = ack.getLastMessageId();
StoreEntry entry = messageContainer.getEntry(msgId);
if (entry != null) {
ReferenceRecord rr = messageContainer.remove(msgId);
if (rr != null) {
removeInterest(rr);
+ if (ack instanceof MessageAckWithLocation) {
+ recordAckFileReferences((MessageAckWithLocation)ack, rr.getData().getFileId());
+ }
dispatchAudit.isDuplicate(msgId);
if (LOG.isDebugEnabled()) {
LOG.debug(destination.getPhysicalName() + " remove reference: " + msgId);
@@ -230,12 +231,18 @@ public class KahaReferenceStore extends
}
}
+ private void recordAckFileReferences(MessageAckWithLocation ack, int messageFileId) {
+ adapter.recordAckFileReferences(ack.location.getDataFileId(), messageFileId);
+ }
+
public void removeAllMessages(ConnectionContext context) throws IOException {
lock.lock();
try {
Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
+ MessageAck ack = new MessageAck();
for (MessageId id:tmpSet) {
- removeMessage(id);
+ ack.setLastMessageId(id);
+ removeMessage(null, ack);
}
resetBatching();
messageContainer.clear();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=964866&r1=964865&r2=964866&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Fri Jul 16 17:30:25 2010
@@ -252,7 +252,44 @@ public class KahaReferenceStoreAdapter e
* @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
*/
public synchronized Set<Integer> getReferenceFileIdsInUse() throws IOException {
- return new HashSet<Integer>(recordReferences.keySet());
+ Set inUse = new HashSet<Integer>(recordReferences.keySet());
+
+ Iterator<Map.Entry<Integer, Set<Integer>>> ackReferences = ackMessageFileMap.entrySet().iterator();
+ while (ackReferences.hasNext()) {
+ Map.Entry<Integer, Set<Integer>> ackReference = ackReferences.next();
+ if (!inUse.contains(ackReference.getKey())) {
+ // should we keep this data file
+ for (Integer referencedFileId : ackReference.getValue()) {
+ if (inUse.contains(referencedFileId)) {
+ // keep this ack file
+ inUse.add(ackReference.getKey());
+ LOG.debug("not removing data file: " + ackReference.getKey()
+ + " as contained ack(s) refer to referencedFileId file: " + ackReference.getValue());
+ break;
+ }
+ }
+ }
+ if (!inUse.contains(ackReference.getKey())) {
+ ackReferences.remove();
+ }
+ }
+
+ return inUse;
+ }
+
+ Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
+ public synchronized void recordAckFileReferences(int ackDataFileId, int messageFileId) {
+ Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackDataFileId));
+ if (referenceFileIds == null) {
+ referenceFileIds = new HashSet<Integer>();
+ referenceFileIds.add(Integer.valueOf(messageFileId));
+ ackMessageFileMap.put(Integer.valueOf(ackDataFileId), referenceFileIds);
+ } else {
+ Integer id = Integer.valueOf(messageFileId);
+ if (!referenceFileIds.contains(id)) {
+ referenceFileIds.add(id);
+ }
+ }
}
/**
@@ -409,4 +446,6 @@ public class KahaReferenceStoreAdapter e
public void setIndexLoadFactor(int loadFactor) {
this.indexLoadFactor = loadFactor;
}
+
+
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java?rev=964866&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java Fri Jul 16 17:30:25 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
+
+
+public class SparseAckReplayAfterStoreCleanupAMQStoreTest extends AMQ2832Test {
+ @Override
+ protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+ brokerService.setPersistenceFactory(new AMQPersistenceAdapterFactory());
+ AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) brokerService.getPersistenceFactory();
+ // ensure there are a bunch of data files but multiple entries in each
+ factory.setMaxFileLength(1024 * 12);
+ // speed up the test case, checkpoint an cleanup early and often
+ factory.setCheckpointInterval(500);
+ factory.setCleanupInterval(500);
+ factory.setSyncOnWrite(false);
+ if (!deleteAllOnStart) {
+ factory.setForceRecoverReferenceStore(true);
+ }
+ brokerService.getPersistenceAdapter();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/SparseAckReplayAfterStoreCleanupAMQStoreTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date