You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/16 16:19:28 UTC
svn commit: r964804 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/amq/
main/java/org/apache/activemq/store/kahadb/
test/java/org/apache/activemq/bugs/
Author: gtully
Date: Fri Jul 16 14:19:27 2010
New Revision: 964804
URL: http://svn.apache.org/viewvc?rev=964804&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2832 - track ack file references to prevent cleanup of data files with acks for inuse message files, impl for kahaDB
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Fri Jul 16 14:19:27 2010
@@ -415,7 +415,7 @@ public class AMQMessageStore extends Abs
}
}
});
- LOG.debug("Batch update done.");
+ LOG.debug("Batch update done. lastLocation:" + lastLocation);
lock.lock();
try {
cpAddedMessageIds = null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Jul 16 14:19:27 2010
@@ -500,6 +500,14 @@ public class KahaDBPersistenceAdapter im
letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
}
+ public boolean getForceRecoverIndex() {
+ return letter.getForceRecoverIndex();
+ }
+
+ public void setForceRecoverIndex(boolean forceRecoverIndex) {
+ letter.setForceRecoverIndex(forceRecoverIndex);
+ }
+
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jul 16 14:19:27 2010
@@ -290,6 +290,14 @@ public class KahaDBStore extends Message
return this.transactionStore;
}
+ public boolean getForceRecoverIndex() {
+ return this.forceRecoverIndex;
+ }
+
+ public void setForceRecoverIndex(boolean forceRecoverIndex) {
+ this.forceRecoverIndex = forceRecoverIndex;
+ }
+
public class KahaDBMessageStore extends AbstractMessageStore {
protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Jul 16 14:19:27 2010
@@ -197,7 +197,7 @@ public class MessageDatabase extends Ser
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
-
+ protected boolean forceRecoverIndex = false;
public MessageDatabase() {
}
@@ -428,7 +428,7 @@ public class MessageDatabase extends Ser
if (recoveryPosition != null) {
int redoCounter = 0;
- LOG.info("Recoverying from the journal ...");
+ LOG.info("Recovering from the journal ...");
while (recoveryPosition != null) {
JournalCommand<?> message = load(recoveryPosition);
metadata.lastUpdate = recoveryPosition;
@@ -653,18 +653,20 @@ public class MessageDatabase extends Ser
}
private Location getRecoveryPosition() throws IOException {
+
+ if (!this.forceRecoverIndex) {
+
+ // If we need to recover the transactions..
+ if (metadata.firstInProgressTransactionLocation != null) {
+ return metadata.firstInProgressTransactionLocation;
+ }
- // If we need to recover the transactions..
- if (metadata.firstInProgressTransactionLocation != null) {
- return metadata.firstInProgressTransactionLocation;
- }
-
- // Perhaps there were no transactions...
- if( metadata.lastUpdate!=null) {
- // Start replay at the record after the last one recorded in the index file.
- return journal.getNextLocation(metadata.lastUpdate);
+ // Perhaps there were no transactions...
+ if( metadata.lastUpdate!=null) {
+ // Start replay at the record after the last one recorded in the index file.
+ return journal.getNextLocation(metadata.lastUpdate);
+ }
}
-
// This loads the first position.
return journal.getNextLocation(null);
}
@@ -1008,6 +1010,7 @@ public class MessageDatabase extends Ser
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
}
+ recordAckMessageReferenceLocation(ackLocation, keys.location);
}
} else {
// In the topic case we need remove the message once it's been acked
@@ -1029,6 +1032,21 @@ public class MessageDatabase extends Ser
}
}
+ Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
+ private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
+ Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
+ if (referenceFileIds == null) {
+ referenceFileIds = new HashSet<Integer>();
+ referenceFileIds.add(messageLocation.getDataFileId());
+ ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
+ } else {
+ Integer id = Integer.valueOf(messageLocation.getDataFileId());
+ if (!referenceFileIds.contains(id)) {
+ referenceFileIds.add(id);
+ }
+ }
+ }
+
void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx);
@@ -1170,6 +1188,28 @@ public class MessageDatabase extends Ser
});
}
+ // check we are not deleting file with ack for in-use journal files
+ Iterator<Integer> candidates = gcCandidateSet.iterator();
+ while (candidates.hasNext()) {
+ Integer candidate = candidates.next();
+ Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
+ if (referencedFileIds != null) {
+ for (Integer referencedFileId : referencedFileIds) {
+ if (journal.getFileMap().containsKey(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
+ // active file that is not targeted for deletion is referenced so don't delete
+ candidates.remove();
+ break;
+ }
+ }
+ if (gcCandidateSet.contains(candidate)) {
+ ackMessageFileMap.remove(candidate);
+ } else {
+ LOG.debug("not removing data file: " + candidate
+ + " as contained ack(s) refer to referenced file: " + referencedFileIds);
+ }
+ }
+ }
+
if( !gcCandidateSet.isEmpty() ) {
LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
journal.removeDataFiles(gcCandidateSet);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java?rev=964804&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java Fri Jul 16 14:19:27 2010
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ2832Test {
+
+ private static final Log LOG = LogFactory.getLog(AMQ2832Test.class);
+
+ BrokerService broker = null;
+ private final Destination destination = new ActiveMQQueue("AMQ2832Test");
+
+ protected void startBroker(boolean delete) throws Exception {
+ broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(delete);
+ broker.setPersistent(true);
+ broker.setUseJmx(false);
+ broker.addConnector("tcp://localhost:0");
+
+ configurePersistence(broker, delete);
+
+ broker.start();
+ LOG.info("Starting broker..");
+ }
+
+ protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+ KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+ // ensure there are a bunch of data files but multiple entries in each
+ adapter.setJournalMaxFileLength(1024 * 20);
+
+ // speed up the test case, checkpoint an cleanup early and often
+ adapter.setCheckpointInterval(500);
+ adapter.setCleanupInterval(500);
+
+ if (!deleteAllOnStart) {
+ adapter.setForceRecoverIndex(true);
+ }
+
+ }
+
+ @Test
+ public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
+
+ startBroker(true);
+
+ StagedConsumer consumer = new StagedConsumer();
+ int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
+ // this will block the reclaiming of one data file
+ Message firstUnacked = consumer.receive(10);
+ LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
+ Message secondUnacked = consumer.receive(1);
+ LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
+ numMessagesAvailable -= 11;
+
+ numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
+ // ensure ack is another data file
+ LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
+ firstUnacked.acknowledge();
+
+ numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
+
+ consumer.receive(numMessagesAvailable).acknowledge();
+
+ // second unacked should keep first data file available but journal with the first ack
+ // may get whacked
+ consumer.close();
+
+ broker.stop();
+ broker.waitUntilStopped();
+
+ startBroker(false);
+
+ consumer = new StagedConsumer();
+ // need to force recovery?
+
+ Message msg = consumer.receive(1, 5);
+ assertNotNull("One messages left after recovery", msg);
+ msg.acknowledge();
+
+ // should be no more messages
+ msg = consumer.receive(1, 5);
+ assertEquals("Only one messages left after recovery: " + msg, null, msg);
+ consumer.close();
+
+ }
+
+ private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+ int sent = 0;
+ Connection connection = new ActiveMQConnectionFactory(
+ broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+ connection.start();
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < numToSend; i++) {
+ producer.send(createMessage(session, i));
+ sent++;
+ }
+ } finally {
+ connection.close();
+ }
+
+ return sent;
+ }
+
+ final String payload = new String(new byte[1024]);
+
+ private Message createMessage(Session session, int i) throws Exception {
+ return session.createTextMessage(payload + "::" + i);
+ }
+
+ private class StagedConsumer {
+ Connection connection;
+ MessageConsumer consumer;
+
+ StagedConsumer() throws Exception {
+ connection = new ActiveMQConnectionFactory("failover://" +
+ broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection();
+ connection.start();
+ consumer = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination);
+ }
+
+ public Message receive(int numToReceive) throws Exception {
+ return receive(numToReceive, 2);
+ }
+
+ public Message receive(int numToReceive, int timeoutInSeconds) throws Exception {
+ Message msg = null;
+ for (; numToReceive > 0; numToReceive--) {
+
+ do {
+ msg = consumer.receive(1*1000);
+ } while (msg == null && --timeoutInSeconds > 0);
+
+ if (numToReceive > 1) {
+ msg.acknowledge();
+ }
+
+ if (msg != null) {
+ LOG.debug("received: " + msg.getJMSMessageID());
+ }
+ }
+ // last message, unacked
+ return msg;
+ }
+
+ void close() throws JMSException {
+ consumer.close();
+ connection.close();
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date