You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/16 16:19:28 UTC

svn commit: r964804 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/kahadb/ test/java/org/apache/activemq/bugs/

Author: gtully
Date: Fri Jul 16 14:19:27 2010
New Revision: 964804

URL: http://svn.apache.org/viewvc?rev=964804&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2832 - track ack file references to prevent cleanup of data files with acks for inuse message files, impl for kahaDB

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQMessageStore.java Fri Jul 16 14:19:27 2010
@@ -415,7 +415,7 @@ public class AMQMessageStore extends Abs
                 }
             }
         });
-        LOG.debug("Batch update done.");
+        LOG.debug("Batch update done. lastLocation:" + lastLocation);
         lock.lock();
         try {
             cpAddedMessageIds = null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Fri Jul 16 14:19:27 2010
@@ -500,6 +500,14 @@ public class KahaDBPersistenceAdapter im
        letter.setDatabaseLockedWaitDelay(databaseLockedWaitDelay);
     }
 
+    public boolean getForceRecoverIndex() {
+        return letter.getForceRecoverIndex();
+    }
+
+    public void setForceRecoverIndex(boolean forceRecoverIndex) {
+        letter.setForceRecoverIndex(forceRecoverIndex);
+    }
+
     @Override
     public String toString() {
         String path = getDirectory() != null ? getDirectory().toString() : "DIRECTORY_NOT_SET";

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jul 16 14:19:27 2010
@@ -290,6 +290,14 @@ public class KahaDBStore extends Message
         return this.transactionStore;
     }
 
+    public boolean getForceRecoverIndex() {
+        return this.forceRecoverIndex;
+    }
+
+    public void setForceRecoverIndex(boolean forceRecoverIndex) {
+        this.forceRecoverIndex = forceRecoverIndex;
+    }
+
     public class KahaDBMessageStore extends AbstractMessageStore {
         protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
         protected KahaDestination dest;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=964804&r1=964803&r2=964804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Jul 16 14:19:27 2010
@@ -197,7 +197,7 @@ public class MessageDatabase extends Ser
     private boolean checkForCorruptJournalFiles = false;
     private boolean checksumJournalFiles = false;
     private int databaseLockedWaitDelay = DEFAULT_DATABASE_LOCKED_WAIT_DELAY;
-    
+    protected boolean forceRecoverIndex = false;
 
     public MessageDatabase() {
     }
@@ -428,7 +428,7 @@ public class MessageDatabase extends Ser
 	            
 	        if (recoveryPosition != null) {  
 	            int redoCounter = 0;
-	            LOG.info("Recoverying from the journal ...");
+	            LOG.info("Recovering from the journal ...");
 	            while (recoveryPosition != null) {
 	                JournalCommand<?> message = load(recoveryPosition);
 	                metadata.lastUpdate = recoveryPosition;
@@ -653,18 +653,20 @@ public class MessageDatabase extends Ser
     }
     
     private Location getRecoveryPosition() throws IOException {
+
+        if (!this.forceRecoverIndex) {
+
+            // If we need to recover the transactions..
+            if (metadata.firstInProgressTransactionLocation != null) {
+                return metadata.firstInProgressTransactionLocation;
+            }
         
-        // If we need to recover the transactions..
-        if (metadata.firstInProgressTransactionLocation != null) {
-            return metadata.firstInProgressTransactionLocation;
-        }
-        
-        // Perhaps there were no transactions...
-        if( metadata.lastUpdate!=null) {
-            // Start replay at the record after the last one recorded in the index file.
-            return journal.getNextLocation(metadata.lastUpdate);
+            // Perhaps there were no transactions...
+            if( metadata.lastUpdate!=null) {
+                // Start replay at the record after the last one recorded in the index file.
+                return journal.getNextLocation(metadata.lastUpdate);
+            }
         }
-	    
         // This loads the first position.
         return journal.getNextLocation(null);
 	}
@@ -1008,6 +1010,7 @@ public class MessageDatabase extends Ser
                 if (keys != null) {
                     sd.locationIndex.remove(tx, keys.location);
                 }
+                recordAckMessageReferenceLocation(ackLocation, keys.location);
             }
         } else {
             // In the topic case we need remove the message once it's been acked
@@ -1029,6 +1032,21 @@ public class MessageDatabase extends Ser
         }
     }
 
+    Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
+    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
+        Set<Integer> referenceFileIds = ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
+        if (referenceFileIds == null) {
+            referenceFileIds = new HashSet<Integer>();
+            referenceFileIds.add(messageLocation.getDataFileId());
+            ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
+        } else {
+            Integer id = Integer.valueOf(messageLocation.getDataFileId());
+            if (!referenceFileIds.contains(id)) {
+                referenceFileIds.add(id);
+            }
+        }
+    }
+
     void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
         StoredDestination sd = getStoredDestination(command.getDestination(), tx);
         sd.orderIndex.clear(tx);
@@ -1170,6 +1188,28 @@ public class MessageDatabase extends Ser
                 });
             }
 
+            // check we are not deleting file with ack for in-use journal files
+            Iterator<Integer> candidates = gcCandidateSet.iterator();
+            while (candidates.hasNext()) {
+                Integer candidate = candidates.next();
+                Set<Integer> referencedFileIds = ackMessageFileMap.get(candidate);
+                if (referencedFileIds != null) {
+                    for (Integer referencedFileId : referencedFileIds) {
+                        if (journal.getFileMap().containsKey(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
+                            // active file that is not targeted for deletion is referenced so don't delete
+                            candidates.remove();
+                            break;
+                        }
+                    }
+                    if (gcCandidateSet.contains(candidate)) {
+                        ackMessageFileMap.remove(candidate);
+                    } else {
+                        LOG.debug("not removing data file: " + candidate
+                                + " as contained ack(s) refer to referenced file: " + referencedFileIds);
+                    }
+                }
+            }
+
             if( !gcCandidateSet.isEmpty() ) {
 	            LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
 	            journal.removeDataFiles(gcCandidateSet);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java?rev=964804&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java Fri Jul 16 14:19:27 2010
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ2832Test {
+
+    private static final Log LOG = LogFactory.getLog(AMQ2832Test.class);
+
+    BrokerService broker = null;
+    private final Destination destination = new ActiveMQQueue("AMQ2832Test");
+
+    protected void startBroker(boolean delete) throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:0");
+
+        configurePersistence(broker, delete);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean deleteAllOnStart) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(500);
+        adapter.setCleanupInterval(500);
+
+        if (!deleteAllOnStart) {
+            adapter.setForceRecoverIndex(true);
+        }
+
+    }
+
+    @Test
+    public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
+
+        startBroker(true);
+
+        StagedConsumer consumer = new StagedConsumer();
+        int numMessagesAvailable = produceMessagesToConsumeMultipleDataFiles(20);
+        // this will block the reclaiming of one data file
+        Message firstUnacked = consumer.receive(10);
+        LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
+        Message secondUnacked = consumer.receive(1);
+        LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
+        numMessagesAvailable -= 11;
+
+        numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
+        // ensure ack is another data file
+        LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
+        firstUnacked.acknowledge();
+
+        numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
+
+        consumer.receive(numMessagesAvailable).acknowledge();
+
+        // second unacked should keep first data file available but journal with the first ack
+        // may get whacked
+        consumer.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        startBroker(false);
+
+        consumer = new StagedConsumer();     
+        // need to force recovery?
+
+        Message msg = consumer.receive(1, 5);
+        assertNotNull("One messages left after recovery", msg);
+        msg.acknowledge();
+
+        // should be no more messages
+        msg = consumer.receive(1, 5);
+        assertEquals("Only one messages left after recovery: " + msg, null, msg);
+        consumer.close();
+
+    }
+
+    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception {
+        int sent = 0;
+        Connection connection = new ActiveMQConnectionFactory(
+                broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            for (int i = 0; i < numToSend; i++) {
+                producer.send(createMessage(session, i));
+                sent++;
+            }
+        } finally {
+            connection.close();
+        }
+        
+        return sent;
+    }
+
+    final String payload = new String(new byte[1024]);
+
+    private Message createMessage(Session session, int i) throws Exception {
+        return session.createTextMessage(payload + "::" + i);
+    }
+
+    private class StagedConsumer {
+        Connection connection;
+        MessageConsumer consumer;
+
+        StagedConsumer() throws Exception {
+            connection = new ActiveMQConnectionFactory("failover://" +
+                    broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection();
+            connection.start();
+            consumer = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination);
+        }
+
+        public Message receive(int numToReceive) throws Exception {
+            return receive(numToReceive, 2);
+        }
+
+        public Message receive(int numToReceive, int timeoutInSeconds) throws Exception {
+            Message msg = null;
+            for (; numToReceive > 0; numToReceive--) {
+
+                do  {
+                    msg = consumer.receive(1*1000);
+                } while (msg == null && --timeoutInSeconds > 0);
+
+                if (numToReceive > 1) {
+                    msg.acknowledge();
+                }
+
+                if (msg != null) {
+                    LOG.debug("received: " + msg.getJMSMessageID());
+                }
+            }
+            // last message, unacked
+            return msg;
+        }
+
+        void close() throws JMSException {
+            consumer.close();
+            connection.close();
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date